This is an automated email from the ASF dual-hosted git repository.
snlee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9d9c9efb60 Fixes backward incompatability with
SegmentGenerationJobSpec for segment push job runners (#10645)
9d9c9efb60 is described below
commit 9d9c9efb607fb02d9a8401d48f9c360749e50da9
Author: Benson Yuan <[email protected]>
AuthorDate: Thu Apr 20 17:29:10 2023 -0700
Fixes backward incompatability with SegmentGenerationJobSpec for segment
push job runners (#10645)
---
.../batch/common/BaseSegmentPushJobRunner.java | 9 ++++----
.../batch/standalone/SegmentTarPushJobRunner.java | 24 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 5 deletions(-)
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
index e855e2ba39..821e42d256 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-common/src/main/java/org/apache/pinot/plugin/ingestion/batch/common/BaseSegmentPushJobRunner.java
@@ -65,12 +65,11 @@ public abstract class BaseSegmentPushJobRunner implements
IngestionJobRunner {
}
// Read Table config
- if (_spec.getTableSpec().getTableConfigURI() == null) {
- throw new RuntimeException("Missing property 'tableConfigURI' in
'tableSpec'");
+ if (_spec.getTableSpec().getTableConfigURI() != null) {
+ _tableConfig =
+
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
spec.getAuthToken());
+ _consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
}
-
- _tableConfig =
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
spec.getAuthToken());
- _consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
}
/**
diff --git
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
index fa549d100b..7bec7635ed 100644
---
a/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
+++
b/pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-standalone/src/main/java/org/apache/pinot/plugin/ingestion/batch/standalone/SegmentTarPushJobRunner.java
@@ -20,7 +20,9 @@ package org.apache.pinot.plugin.ingestion.batch.standalone;
import java.util.ArrayList;
import java.util.Map;
+import org.apache.pinot.common.segment.generation.SegmentGenerationUtils;
import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner;
+import org.apache.pinot.segment.local.utils.ConsistentDataPushUtils;
import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec;
import org.apache.pinot.spi.utils.retry.AttemptsExceededException;
@@ -35,6 +37,28 @@ public class SegmentTarPushJobRunner extends
BaseSegmentPushJobRunner {
init(spec);
}
+ /**
+ * Initialize SegmentTarPushJobRunner with SegmentGenerationJobSpec
+ * Checks for required parameters in the spec and enablement of consistent
data push.
+ * This overrides the init method in BaseSegmentPushJobRunner as the push
job spec is required in the base class.
+ */
+ @Override
+ public void init(SegmentGenerationJobSpec spec) {
+ _spec = spec;
+
+ // Read Table spec
+ if (_spec.getTableSpec() == null) {
+ throw new RuntimeException("Missing tableSpec");
+ }
+
+ // Read Table config
+ if (_spec.getTableSpec().getTableConfigURI() != null) {
+ _tableConfig =
+
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
spec.getAuthToken());
+ _consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(_tableConfig);
+ }
+ }
+
public void uploadSegments(Map<String, String> segmentsUriToTarPathMap)
throws AttemptsExceededException, RetriableOperationException {
SegmentPushUtils.pushSegments(_spec, _outputDirFS, new
ArrayList<>(segmentsUriToTarPathMap.values()));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]