This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new d9b4723187 Revert "Using local copy of segment instead of downloading
from remote (#14429)" (#14484)
d9b4723187 is described below
commit d9b47231872e86f1d9217fdbb034118b48b207c6
Author: Ragesh Rajagopalan <[email protected]>
AuthorDate: Mon Nov 18 16:22:26 2024 -0800
Revert "Using local copy of segment instead of downloading from remote
(#14429)" (#14484)
This reverts commit b90d1da2edb82e92f43c873edf93d1f32dceee40.
---
.../BaseMultipleSegmentsConversionExecutor.java | 33 +++++++++++++---------
1 file changed, 19 insertions(+), 14 deletions(-)
diff --git
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
index 11b0116429..19d145ce3d 100644
---
a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
+++
b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java
@@ -55,7 +55,6 @@ import org.apache.pinot.segment.local.utils.SegmentPushUtils;
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.spi.auth.AuthProvider;
import org.apache.pinot.spi.config.table.TableType;
-import org.apache.pinot.spi.filesystem.LocalPinotFS;
import org.apache.pinot.spi.filesystem.PinotFS;
import org.apache.pinot.spi.ingestion.batch.BatchConfigProperties;
import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec;
@@ -80,7 +79,6 @@ import org.slf4j.LoggerFactory;
public abstract class BaseMultipleSegmentsConversionExecutor extends
BaseTaskExecutor {
private static final Logger LOGGER =
LoggerFactory.getLogger(BaseMultipleSegmentsConversionExecutor.class);
private static final String CUSTOM_SEGMENT_UPLOAD_CONTEXT_LINEAGE_ENTRY_ID =
"lineageEntryId";
- private static final PinotFS LOCAL_PINOT_FS = new LocalPinotFS();
private static final int DEFUALT_PUSH_ATTEMPTS = 5;
private static final int DEFAULT_PUSH_PARALLELISM = 1;
@@ -286,11 +284,14 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
String.format("Uploading segment: %s (%d out of %d)",
resultSegmentName, (i + 1), numOutputSegments));
String pushMode =
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
+ URI outputSegmentTarURI;
if
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())
!= BatchConfigProperties.SegmentPushType.TAR) {
- URI outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs,
convertedTarredSegmentFile);
+ outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs,
convertedTarredSegmentFile);
LOGGER.info("Moved generated segment from [{}] to location: [{}]",
convertedTarredSegmentFile,
outputSegmentTarURI);
+ } else {
+ outputSegmentTarURI = convertedTarredSegmentFile.toURI();
}
// Set segment ZK metadata custom map modifier into HTTP header to
modify the segment ZK metadata
@@ -315,12 +316,11 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
}
if (batchSegmentUpload) {
- updateSegmentUriToTarPathMap(taskConfigs,
convertedTarredSegmentFile.toURI(), segmentConversionResult,
+ updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI,
segmentConversionResult,
segmentUriToTarPathMap, pushJobSpec);
} else {
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
- pushSegment(rawTableName, taskConfigs,
convertedTarredSegmentFile.toURI(), httpHeaders, parameters,
- segmentConversionResult);
+ pushSegment(rawTableName, taskConfigs, outputSegmentTarURI,
httpHeaders, parameters, segmentConversionResult);
if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) {
LOGGER.warn("Failed to delete tarred converted segment: {}",
convertedTarredSegmentFile.getAbsolutePath());
}
@@ -438,15 +438,18 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
List<Header> headers = getSegmentPushCommonHeaders(pinotTaskConfig,
authProvider, segmentConversionResults);
List<NameValuePair> parameters =
getSegmentPushCommonParams(tableNameWithType);
- SegmentPushUtils.sendSegmentsUriAndMetadata(spec, LOCAL_PINOT_FS,
segmentUriToTarPathMap, headers, parameters);
+ URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
+ try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs,
outputSegmentDirURI)) {
+ SegmentPushUtils.sendSegmentsUriAndMetadata(spec, outputFileFS,
segmentUriToTarPathMap, headers, parameters);
+ }
}
- private void pushSegment(String tableName, Map<String, String> taskConfigs,
URI localSegmentTarURI,
+ private void pushSegment(String tableName, Map<String, String> taskConfigs,
URI outputSegmentTarURI,
List<Header> headers, List<NameValuePair> parameters,
SegmentConversionResult segmentConversionResult)
throws Exception {
String pushMode =
taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE,
BatchConfigProperties.SegmentPushType.TAR.name());
- LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, localSegmentTarURI);
+ LOGGER.info("Trying to push Pinot segment with push mode {} from {}",
pushMode, outputSegmentTarURI);
PushJobSpec pushJobSpec = new PushJobSpec();
pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS);
@@ -459,7 +462,7 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
switch
(BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) {
case TAR:
- File tarFile = new File(localSegmentTarURI);
+ File tarFile = new File(outputSegmentTarURI);
String segmentName = segmentConversionResult.getSegmentName();
String tableNameWithType =
segmentConversionResult.getTableNameWithType();
String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY);
@@ -469,10 +472,12 @@ public abstract class
BaseMultipleSegmentsConversionExecutor extends BaseTaskExe
case METADATA:
if
(taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) {
URI outputSegmentDirURI =
URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI));
- Map<String, String> segmentUriToTarPathMap =
- SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI,
pushJobSpec,
- new String[]{localSegmentTarURI.toString()});
- SegmentPushUtils.sendSegmentUriAndMetadata(spec, LOCAL_PINOT_FS,
segmentUriToTarPathMap, headers, parameters);
+ try (PinotFS outputFileFS =
MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) {
+ Map<String, String> segmentUriToTarPathMap =
+
SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec,
+ new String[]{outputSegmentTarURI.toString()});
+ SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS,
segmentUriToTarPathMap, headers, parameters);
+ }
} else {
throw new RuntimeException("Output dir URI missing for metadata
push");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]