priyen commented on code in PR #14139:
URL: https://github.com/apache/pinot/pull/14139#discussion_r1793669529
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -59,43 +97,91 @@ public void init(SegmentGenerationJobSpec spec) {
}
@Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
+ public void run()
+ throws Exception {
+ // Initialize all file systems
+ setupFileSystems();
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
+ // Get outputFS for writing output pinot segments
+ URI outputDirURI = validateOutputDirURI(_spec.getOutputDirURI());
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
+ // Collect files to process
+ List<String> segmentsToPush = getSegmentsToPush(outputDirFS, outputDirURI);
+
+ // Ensure tableConfigURI is set if missing
+ setupTableConfigURI();
+
+ // Retrieve table config and check for consistent push
+ TableConfig tableConfig =
+
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
_spec.getAuthToken());
+ boolean consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(tableConfig);
+ // Determine push parallelism
int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
+
+ // Call the appropriate push helper
+ if (consistentPushEnabled) {
+ handleConsistentPush(segmentsToPush, outputDirURI, pushParallelism);
+ } else {
+ handleNonConsistentPush(segmentsToPush, outputDirFS, outputDirURI,
pushParallelism);
+ }
+ }
+
+ private void handleConsistentPush(List<String> segmentsToPush, URI
outputDirURI, int pushParallelism)
+ throws Exception {
+ Map<String, String> segmentsToPushUriToTarPathMap =
+ SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI,
_spec.getPushJobSpec(),
+ segmentsToPush.toArray(new String[0]));
+ Map<URI, String> uriToLineageEntryIdMap =
+ ConsistentDataPushUtils.preUpload(_spec,
getSegmentsToReplace(segmentsToPushUriToTarPathMap));
+
+ if (pushParallelism == 1) {
+ // Single push
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(outputDirURI.getScheme()),
+ segmentsToPushUriToTarPathMap);
+ } else {
+ // Parallel push using Spark
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ sparkContext.sc().addSparkListener(new
ConsistentDataPushFailureHandler(_spec, uriToLineageEntryIdMap));
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
+
+ if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
+ // Process segments in batch mode using foreachPartition
+ pathRDD.foreachPartition(segmentIterator -> {
+ setupFileSystems();
Review Comment:
yes it needs to since it runs inside each different executor and they have
their own instantiation
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]