priyen commented on code in PR #14139:
URL: https://github.com/apache/pinot/pull/14139#discussion_r1792188869
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -59,43 +97,91 @@ public void init(SegmentGenerationJobSpec spec) {
}
@Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
+ public void run()
+ throws Exception {
+ // Initialize all file systems
+ setupFileSystems();
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
+ // Get outputFS for writing output pinot segments
+ URI outputDirURI = validateOutputDirURI(_spec.getOutputDirURI());
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
+ // Collect files to process
+ List<String> segmentsToPush = getSegmentsToPush(outputDirFS, outputDirURI);
+
+ // Ensure tableConfigURI is set if missing
+ setupTableConfigURI();
+
+ // Retrieve table config and check for consistent push
+ TableConfig tableConfig =
+
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
_spec.getAuthToken());
+ boolean consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(tableConfig);
+ // Determine push parallelism
int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
+
+ // Call the appropriate push helper
+ if (consistentPushEnabled) {
+ handleConsistentPush(segmentsToPush, outputDirURI, pushParallelism);
+ } else {
+ handleNonConsistentPush(segmentsToPush, outputDirFS, outputDirURI,
pushParallelism);
+ }
+ }
+
+ private void handleConsistentPush(List<String> segmentsToPush, URI
outputDirURI, int pushParallelism)
+ throws Exception {
+ Map<String, String> segmentsToPushUriToTarPathMap =
+ SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI,
_spec.getPushJobSpec(),
+ segmentsToPush.toArray(new String[0]));
+ Map<URI, String> uriToLineageEntryIdMap =
Review Comment:
yep, it is a bit odd. (The current behaviour in master) regarding this + the
standalone job
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]