This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new d9b4723187 Revert "Using local copy of segment instead of downloading 
from remote (#14429)" (#14484)
d9b4723187 is described below

commit d9b47231872e86f1d9217fdbb034118b48b207c6
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Mon Nov 18 16:22:26 2024 -0800

    Revert "Using local copy of segment instead of downloading from remote 
(#14429)" (#14484)
    
    This reverts commit b90d1da2edb82e92f43c873edf93d1f32dceee40.
---
 .../BaseMultipleSegmentsConversionExecutor.java    | 33 +++++++++++++---------
 1 file changed, 19 insertions(+), 14 deletions(-)

diff --git 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 11b0116429..19d145ce3d 100644
--- 
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++ 
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -55,7 +55,6 @@ import org.apache.pinot.segment.local.utils.SegmentPushUtils;
 import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
 import org.apache.pinot.spi.auth.AuthProvider;
 import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.filesystem.LocalPinotFS;
 import org.apache.pinot.spi.filesystem.PinotFS;
 import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
 import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -80,7 +79,6 @@ import org.slf4j.LoggerFactory;
 public abstract class BaseMultipleSegmentsConversionExecutor extends 
BaseTaskExecutor {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
   private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID = 
"lineageEntryId";
-  private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
 
   private static final int DEFUALT_PUSH_ATTEMPTS = 5;
   private static final int DEFAULT_PUSH_PARALLELISM = 1;
@@ -286,11 +284,14 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
             String.format("Uploading segment: %s (%d out of %d)", 
resultSegmentName, (i + 1), numOutputSegments));
         String pushMode = 
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
             BatchConfigProperties.SegmentPushType.TAR.name());
+        URI outputSegmentTarURI;
         if 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
             != BatchConfigProperties.SegmentPushType.TAR) {
-          URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, 
convertedTarredSegmentFile);
+          outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, 
convertedTarredSegmentFile);
           LOGGER.info("Moved generated segment from [{}] to location: [{}]", 
convertedTarredSegmentFile,
               outputSegmentTarURI);
+        } else {
+          outputSegmentTarURI = convertedTarredSegmentFile.toURI();
         }
 
         // Set segment ZK metadata custom map modifier into HTTP header to 
modify the segment ZK metadata
@@ -315,12 +316,11 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
         }
 
         if (batchSegmentUpload) {
-          updateSegmentUriToTarPathMap(taskConfigs, 
convertedTarredSegmentFile.toURI(), segmentConversionResult,
+          updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, 
segmentConversionResult,
               segmentUriToTarPathMap, pushJobSpec);
         } else {
           String rawTableName = 
TableNameBuilder.extractRawTableName(tableNameWithType);
-          pushSegment(rawTableName, taskConfigs, 
convertedTarredSegmentFile.toURI(), httpHeaders, parameters,
-              segmentConversionResult);
+          pushSegment(rawTableName, taskConfigs, outputSegmentTarURI, 
httpHeaders, parameters, segmentConversionResult);
           if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
             LOGGER.warn("Failed to delete tarred converted segment: {}", 
convertedTarredSegmentFile.getAbsolutePath());
           }
@@ -438,15 +438,18 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
     List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig, 
authProvider, segmentConversionResults);
     List<NameValuePair> parameters = 
getSegmentPushCommonParams(tableNameWithType);
 
-    SegmentPushUtils.sendSegmentsUriAndMetadata(spec, LOCAL_PINOT_FS, 
segmentUriToTarPathMap, headers, parameters);
+    URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+    try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, 
outputSegmentDirURI)) {
+      SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, headers, parameters);
+    }
   }
 
-  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI localSegmentTarURI,
+  private void pushSegment(String tableName, Map<String, String> taskConfigs, 
URI outputSegmentTarURI,
       List<Header> headers, List<NameValuePair> parameters, 
SegmentConversionResult segmentConversionResult)
       throws Exception {
     String pushMode =
         taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, 
BatchConfigProperties.SegmentPushType.TAR.name());
-    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, localSegmentTarURI);
+    LOGGER.info("Trying to push Pinot segment with push mode {} from {}", 
pushMode, outputSegmentTarURI);
 
     PushJobSpec pushJobSpec = new PushJobSpec();
     pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
@@ -459,7 +462,7 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
 
     switch 
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
       case TAR:
-        File tarFile = new File(localSegmentTarURI);
+        File tarFile = new File(outputSegmentTarURI);
         String segmentName = segmentConversionResult.getSegmentName();
         String tableNameWithType = 
segmentConversionResult.getTableNameWithType();
         String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
@@ -469,10 +472,12 @@ public abstract class 
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
       case METADATA:
         if 
(taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
           URI outputSegmentDirURI = 
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
-          Map<String, String> segmentUriToTarPathMap =
-              SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, 
pushJobSpec,
-                  new String[]{localSegmentTarURI.toString()});
-          SegmentPushUtils.sendSegmentUriAndMetadata(spec, LOCAL_PINOT_FS, 
segmentUriToTarPathMap, headers, parameters);
+          try (PinotFS outputFileFS = 
MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+            Map<String, String> segmentUriToTarPathMap =
+                
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+                    new String[]{outputSegmentTarURI.toString()});
+            SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, 
segmentUriToTarPathMap, headers, parameters);
+          }
         } else {
           throw new RuntimeException("Output dir URI missing for metadata 
push");
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to