This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d09c0ad Refactoring SegmentGenerationAndPushTask for extensibility
(#7188)
d09c0ad is described below
commit d09c0add495a6d38b29126f663879b59cfbaab43
Author: Tim Santos <[email protected]>
AuthorDate: Fri Jul 23 18:13:21 2021 -0700
Refactoring SegmentGenerationAndPushTask for extensibility (#7188)
Changes the `generateTaskSpec` method to be `protected` so that it can be
overridden by a subclass.
---
.../SegmentGenerationAndPushTaskExecutor.java | 42 ++++++++++++----------
1 file changed, 24 insertions(+), 18 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
index 0eea8ee..4487429 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/segment_generation_and_push/SegmentGenerationAndPushTaskExecutor.java
@@ -110,31 +110,37 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
SegmentGenerationAndPushResult.Builder resultBuilder = new
SegmentGenerationAndPushResult.Builder();
File localTempDir = new File(new
File(MinionContext.getInstance().getDataDir(),
"SegmentGenerationAndPushResult"),
"tmp-" + UUID.randomUUID());
-
try {
- // Generate Pinot Segment
SegmentGenerationTaskSpec taskSpec = generateTaskSpec(taskConfigs,
localTempDir);
- SegmentGenerationTaskRunner taskRunner = new
SegmentGenerationTaskRunner(taskSpec);
- String segmentName = taskRunner.run();
-
- // Tar segment directory to compress file
- File localSegmentTarFile = tarSegmentDir(taskSpec, segmentName);
-
- //move segment to output PinotFS
- URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs,
localSegmentTarFile);
- LOGGER.info("Moved generated segment from [{}] to location: [{}]",
localSegmentTarFile, outputSegmentTarURI);
-
- resultBuilder.setSegmentName(segmentName);
- // Segment push task
- // TODO: Make this use SegmentUploader
- pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs,
outputSegmentTarURI);
- resultBuilder.setSucceed(true);
+ return generateAndPushSegment(taskSpec, resultBuilder, taskConfigs);
} catch (Exception e) {
throw new RuntimeException("Failed to execute
SegmentGenerationAndPushTask", e);
} finally {
// Cleanup output dir
FileUtils.deleteQuietly(localTempDir);
}
+ }
+
+ private SegmentGenerationAndPushResult
generateAndPushSegment(SegmentGenerationTaskSpec taskSpec,
+ SegmentGenerationAndPushResult.Builder resultBuilder,
+ Map<String, String> taskConfigs) throws Exception {
+ // Generate Pinot Segment
+ SegmentGenerationTaskRunner taskRunner = new
SegmentGenerationTaskRunner(taskSpec);
+ String segmentName = taskRunner.run();
+
+ // Tar segment directory to compress file
+ File localSegmentTarFile = tarSegmentDir(taskSpec, segmentName);
+
+ //move segment to output PinotFS
+ URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs,
localSegmentTarFile);
+ LOGGER.info("Moved generated segment from [{}] to location: [{}]",
localSegmentTarFile, outputSegmentTarURI);
+
+ resultBuilder.setSegmentName(segmentName);
+ // Segment push task
+ // TODO: Make this use SegmentUploader
+ pushSegment(taskSpec.getTableConfig().getTableName(), taskConfigs,
outputSegmentTarURI);
+ resultBuilder.setSucceed(true);
+
return resultBuilder.build();
}
@@ -242,7 +248,7 @@ public class SegmentGenerationAndPushTaskExecutor extends
BaseTaskExecutor {
return localSegmentTarFile;
}
- private SegmentGenerationTaskSpec generateTaskSpec(Map<String, String>
taskConfigs, File localTempDir)
+ protected SegmentGenerationTaskSpec generateTaskSpec(Map<String, String>
taskConfigs, File localTempDir)
throws Exception {
SegmentGenerationTaskSpec taskSpec = new SegmentGenerationTaskSpec();
URI inputFileURI =
URI.create(taskConfigs.get(BatchConfigProperties.INPUT_DATA_FILE_URI_KEY));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]