swaminathanmanish commented on code in PR #14139:
URL: https://github.com/apache/pinot/pull/14139#discussion_r1793243424
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -106,28 +192,123 @@ public void run() {
} else {
JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
+
+ if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
+ // Process segments in batch mode using foreachPartition
+ pathRDD.foreachPartition(new VoidFunction<Iterator<String>>() {
+ @Override
+ public void call(Iterator<String> segmentIterator) throws Exception {
+ PluginManager.get().init();
+ setupFileSystems();
+
+ List<String> segmentsInPartition = new ArrayList<>();
+ segmentIterator.forEachRemaining(segmentsInPartition::add);
+
+ try {
+ Map<String, String> segmentUriToTarPathMap =
+ SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI,
_spec.getPushJobSpec(),
+ segmentsInPartition.toArray(new String[0]));
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(outputDirURI.getScheme()),
+ segmentUriToTarPathMap);
+ } catch (RetriableOperationException | AttemptsExceededException
e) {
+ throw new RuntimeException(e);
+ }
}
- try {
- Map<String, String> segmentUriToTarPathMap = SegmentPushUtils
- .getSegmentUriToTarPathMap(finalOutputDirURI,
_spec.getPushJobSpec(), new String[]{segmentTarPath});
- SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(finalOutputDirURI.getScheme()),
- segmentUriToTarPathMap);
- } catch (RetriableOperationException | AttemptsExceededException e) {
- throw new RuntimeException(e);
+ });
+ } else {
+ // Process segments one by one using foreach
+ pathRDD.foreach(new VoidFunction<String>() {
+ @Override
+ public void call(String segmentTarPath) throws Exception {
Review Comment:
Do we have tests for this change? non-consistent push batch upload code path
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -41,16 +50,45 @@
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.VoidFunction;
+import org.apache.spark.scheduler.JobFailed;
+import org.apache.spark.scheduler.JobResult;
+import org.apache.spark.scheduler.SparkListener;
+import org.apache.spark.scheduler.SparkListenerJobEnd;
public class SparkSegmentMetadataPushJobRunner implements IngestionJobRunner,
Serializable {
- private SegmentGenerationJobSpec _spec;
+ // This listener is added to the SparkContext and is executed when the Spark
job fails.
+ // It handles the failure by calling
ConsistentDataPushUtils.handleUploadException.
+ // The listener is only added if consistent data push is enabled and the
pushParallelism is greater than 1.
+ // This listener is not a required part of the implementation, as the start
replace segments protocol
+ // will cleanup past failures as part of the fresh consistent data push, but
it's still cleaner to handle
+ // the failure as soon as possible.
+ private static class ConsistentDataPushFailureHandler extends SparkListener {
+ private final SegmentGenerationJobSpec _spec;
+ private final Map<URI, String> _uriToLineageEntryIdMap;
- public SparkSegmentMetadataPushJobRunner() {
+ public ConsistentDataPushFailureHandler(SegmentGenerationJobSpec spec,
Map<URI, String> uriToLineageEntryIdMap) {
Review Comment:
Do we have tests for this listener code?
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -106,28 +192,123 @@ public void run() {
} else {
JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
- URI finalOutputDirURI = outputDirURI;
- // Prevent using lambda expression in Spark to avoid potential
serialization exceptions, use inner function
- // instead.
- pathRDD.foreach(new VoidFunction<String>() {
- @Override
- public void call(String segmentTarPath)
- throws Exception {
- PluginManager.get().init();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory
- .register(pinotFSSpec.getScheme(), pinotFSSpec.getClassName(),
new PinotConfiguration(pinotFSSpec));
+
+ if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
Review Comment:
Clarification:
Is there a reason why we have this if/else check? we basically call the same
code whether batch/no-batch right?
_spec.getPushJobSpec().isBatchSegmentUpload
foreachPartition applies this on the partition itself (i.e N segments per
partition based on pushParallelism), whereas forEach goes through each element?
##########
pinot-plugins/pinot-batch-ingestion/pinot-batch-ingestion-spark-3/src/main/java/org/apache/pinot/plugin/ingestion/batch/spark3/SparkSegmentMetadataPushJobRunner.java:
##########
@@ -59,43 +97,91 @@ public void init(SegmentGenerationJobSpec spec) {
}
@Override
- public void run() {
- //init all file systems
- List<PinotFSSpec> pinotFSSpecs = _spec.getPinotFSSpecs();
- for (PinotFSSpec pinotFSSpec : pinotFSSpecs) {
- PinotFSFactory.register(pinotFSSpec.getScheme(),
pinotFSSpec.getClassName(), new PinotConfiguration(pinotFSSpec));
- }
+ public void run()
+ throws Exception {
+ // Initialize all file systems
+ setupFileSystems();
- //Get outputFS for writing output pinot segments
- URI outputDirURI;
- try {
- outputDirURI = new URI(_spec.getOutputDirURI());
- if (outputDirURI.getScheme() == null) {
- outputDirURI = new File(_spec.getOutputDirURI()).toURI();
- }
- } catch (URISyntaxException e) {
- throw new RuntimeException("outputDirURI is not valid - '" +
_spec.getOutputDirURI() + "'");
- }
+ // Get outputFS for writing output pinot segments
+ URI outputDirURI = validateOutputDirURI(_spec.getOutputDirURI());
PinotFS outputDirFS = PinotFSFactory.create(outputDirURI.getScheme());
- //Get list of files to process
- String[] files;
- try {
- files = outputDirFS.listFiles(outputDirURI, true);
- } catch (IOException e) {
- throw new RuntimeException("Unable to list all files under outputDirURI
- '" + outputDirURI + "'");
- }
- List<String> segmentsToPush = new ArrayList<>();
- for (String file : files) {
- if (file.endsWith(Constants.TAR_GZ_FILE_EXT)) {
- segmentsToPush.add(file);
- }
- }
+ // Collect files to process
+ List<String> segmentsToPush = getSegmentsToPush(outputDirFS, outputDirURI);
+
+ // Ensure tableConfigURI is set if missing
+ setupTableConfigURI();
+
+ // Retrieve table config and check for consistent push
+ TableConfig tableConfig =
+
SegmentGenerationUtils.getTableConfig(_spec.getTableSpec().getTableConfigURI(),
_spec.getAuthToken());
+ boolean consistentPushEnabled =
ConsistentDataPushUtils.consistentDataPushEnabled(tableConfig);
+ // Determine push parallelism
int pushParallelism = _spec.getPushJobSpec().getPushParallelism();
if (pushParallelism < 1) {
pushParallelism = segmentsToPush.size();
}
+
+ // Call the appropriate push helper
+ if (consistentPushEnabled) {
+ handleConsistentPush(segmentsToPush, outputDirURI, pushParallelism);
+ } else {
+ handleNonConsistentPush(segmentsToPush, outputDirFS, outputDirURI,
pushParallelism);
+ }
+ }
+
+ private void handleConsistentPush(List<String> segmentsToPush, URI
outputDirURI, int pushParallelism)
+ throws Exception {
+ Map<String, String> segmentsToPushUriToTarPathMap =
+ SegmentPushUtils.getSegmentUriToTarPathMap(outputDirURI,
_spec.getPushJobSpec(),
+ segmentsToPush.toArray(new String[0]));
+ Map<URI, String> uriToLineageEntryIdMap =
+ ConsistentDataPushUtils.preUpload(_spec,
getSegmentsToReplace(segmentsToPushUriToTarPathMap));
+
+ if (pushParallelism == 1) {
+ // Single push
+ SegmentPushUtils.sendSegmentUriAndMetadata(_spec,
PinotFSFactory.create(outputDirURI.getScheme()),
+ segmentsToPushUriToTarPathMap);
+ } else {
+ // Parallel push using Spark
+ JavaSparkContext sparkContext =
JavaSparkContext.fromSparkContext(SparkContext.getOrCreate());
+ sparkContext.sc().addSparkListener(new
ConsistentDataPushFailureHandler(_spec, uriToLineageEntryIdMap));
+ JavaRDD<String> pathRDD = sparkContext.parallelize(segmentsToPush,
pushParallelism);
+
+ if (_spec.getPushJobSpec().isBatchSegmentUpload()) {
+ // Process segments in batch mode using foreachPartition
+ pathRDD.foreachPartition(segmentIterator -> {
+ setupFileSystems();
Review Comment:
Does setupFileSystems need to happen for every iteration or can it be
outside?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]