This is an automated email from the ASF dual-hosted git repository.

snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 9d9c9efb60 Fixes backward incompatability with 
SegmentGenerationJobSpec for segment push job runners (#10645)
9d9c9efb60 is described below

commit 9d9c9efb607fb02d9a8401d48f9c360749e50da9
Author: Benson Yuan <[email protected]>
AuthorDate: Thu Apr 20 17:29:10 2023 -0700

    Fixes backward incompatability with SegmentGenerationJobSpec for segment 
push job runners (#10645)
---
 .../batch/common/BaseSegmentPushJobRunner.java     |  9 ++++----
 .../batch/standalone/SegmentTarPushJobRunner.java  | 24 ++++++++++++++++++++++
 2 files changed, 28 insertions(+), 5 deletions(-)

diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
index e855e2ba39..821e42d256 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
@@ -65,12 +65,11 @@ public abstract class BaseSegmentPushJobRunner implements 
IngestionJobRunner {
     }
 
     // Read Table config
-    if (_spec.getTableSpec().getTableConfigURI() == null) {
-      throw new RuntimeException("Missing property 'tableConfigURI' in 
'tableSpec'");
+    if (_spec.getTableSpec().getTableConfigURI() != null) {
+      _tableConfig =
+          
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), 
spec.getAuthToken());
+      _consistentPushEnabled = 
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
     }
-
-    _tableConfig = 
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), 
spec.getAuthToken());
-    _consistentPushEnabled = 
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
   }
 
   /**
diff --git 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
index fa549d100b..7bec7635ed 100644
--- 
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
+++ 
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
@@ -20,7 +20,9 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
 
 import java.util.ArrayList;
 import java.util.Map;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
 import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
+import org.apache.pinot.segment.local.utils.ConsistentDataPushUtils;
 import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
 import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
@@ -35,6 +37,28 @@ public class SegmentTarPushJobRunner extends 
BaseSegmentPushJobRunner {
     init(spec);
   }
 
+  /**
+   * Initialize SegmentTarPushJobRunner with SegmentGenerationJobSpec
+   * Checks for required parameters in the spec and enablement of consistent 
data push.
+   * This overrides the init method in BaseSegmentPushJobRunner as the push 
job spec is required in the base class.
+   */
+  @Override
+  public void init(SegmentGenerationJobSpec spec) {
+    _spec = spec;
+
+    // Read Table spec
+    if (_spec.getTableSpec() == null) {
+      throw new RuntimeException("Missing tableSpec");
+    }
+
+    // Read Table config
+    if (_spec.getTableSpec().getTableConfigURI() != null) {
+      _tableConfig =
+          
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(), 
spec.getAuthToken());
+      _consistentPushEnabled = 
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
+    }
+  }
+
   public void uploadSegments(Map<String, String> segmentsUriToTarPathMap)
       throws AttemptsExceededException, RetriableOperationException {
     SegmentPushUtils.pushSegments(_spec, _outputDirFS, new 
ArrayList<>(segmentsUriToTarPathMap.values()));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to