jihoonson commented on a change in pull request #11507:
URL: https://github.com/apache/druid/pull/11507#discussion_r678800376
##########
File path:
extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final
DataSegment inSegment, f
throws IOException
{
final String path = OssUtils.constructSegmentPath(config.getPrefix(),
getStorageDir(inSegment, useUniquePath));
-
log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(),
path);
+ return pushToPath(indexFilesDir, inSegment, path);
+ }
+ @Override
+ public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment,
String path) throws IOException
+ {
final File zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
final DataSegment outSegment = inSegment.withSize(indexSize)
-
.withLoadSpec(makeLoadSpec(config.getBucket(), path))
-
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+ .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+ .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
Review comment:
These changes don't seem like Druid code style. Are you using
https://github.com/apache/druid/blob/master/dev/druid_intellij_formatting.xml?
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -53,7 +56,7 @@
*
* @return size of the writen segment
Review comment:
This method no longer returns the size written. Can you fix the javadoc
as well?
##########
File path:
extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final
DataSegment inSegment, f
throws IOException
{
final String path = OssUtils.constructSegmentPath(config.getPrefix(),
getStorageDir(inSegment, useUniquePath));
-
log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(),
path);
+ return pushToPath(indexFilesDir, inSegment, path);
+ }
+ @Override
+ public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment,
String path) throws IOException
+ {
final File zipOutFile = File.createTempFile("druid", "index.zip");
final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
final DataSegment outSegment = inSegment.withSize(indexSize)
-
.withLoadSpec(makeLoadSpec(config.getBucket(), path))
-
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+ .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+ .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
Review comment:
Please fix the indentation in here and other places.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskToolbox;
+import
org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat;
+import
org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+public class DeepStorageIntermediaryDataManager implements
IntermediaryDataManager
+{
+ public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
+ private final DataSegmentPusher dataSegmentPusher;
+
+ @Override
+ public DataSegment addSegment(String supervisorTaskId, String subTaskId,
DataSegment segment, File segmentDir)
+ throws IOException
+ {
+ final BucketNumberedShardSpec<?> bucketNumberedShardSpec =
(BucketNumberedShardSpec<?>) segment.getShardSpec();
+ final String partitionFilePath = getPartitionFilePath(
+ supervisorTaskId,
+ subTaskId,
+ segment.getInterval(),
+ bucketNumberedShardSpec.getBucketId() // we must use the bucket ID
instead of partition ID
+ );
+ return dataSegmentPusher.pushToPath(segmentDir, segment,
SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath);
+ }
+
+ @Inject
+ public DeepStorageIntermediaryDataManager(DataSegmentPusher
dataSegmentPusher)
+ {
+ this.dataSegmentPusher = dataSegmentPusher;
+ }
+
+ @Override
+ public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox,
DataSegment segment)
+ {
+ return new DeepStoragePartitionStat(
+ toolbox.getTaskExecutorNode().getHost(),
+ toolbox.getTaskExecutorNode().getPortToUse(),
+ toolbox.getTaskExecutorNode().isEnableTlsPort(),
+ segment.getInterval(),
+ (BucketNumberedShardSpec) segment.getShardSpec(),
+ null, // numRows is not supported yet
+ null, // sizeBytes is not supported yet
+ segment.getLoadSpec()
+ );
+ }
+
+ @Nullable
+ @Override
+ public Optional<ByteSource> findPartitionFile(String supervisorTaskId,
String subTaskId, Interval interval, int bucketId)
+ {
+ return Optional.empty();
Review comment:
I guess this should be never called? It would be better to throw an
exception then.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionLocation extends GenericPartitionLocation
Review comment:
Seems like only `subTaskId` and `loadSpec` are in use in this class
which confuses me why this class needs other fields. I suggest to add a new
interface for `PartitionLocation` and let `DeepStoragePartitionLocation`
implement it instead of extending `GenericPartitionLocation` because
`GenericPartitionLocation` is designed for local storage for shuffle. Since we
already have an abstract class of the same name of `PartitionLocation`, you
will have to rename it or use another name for the new interface.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionStat extends GenericPartitionStat
+{
+ private final Map<String, Object> loadSpec;
+
+ @JsonCreator
+ public DeepStoragePartitionStat(
+ @JsonProperty("taskExecutorHost") String taskExecutorHost,
+ @JsonProperty("taskExecutorPort") int taskExecutorPort,
+ @JsonProperty("useHttps") boolean useHttps,
Review comment:
Similarly, I suggest to add a new interface for `PartitionStat` and
rename the existing abstract class of `PartitionStat` to something else. Then
this class can implement the new `PartitionStat` interface and keep only
necessary fields. I think we will probably never need `useHttps`. For
`taskExecutorHost` and `taskExecutorPort`, maybe they could be useful in the
future if we want to collect per-task shuffle metrics in the supervisor task.
But, as I'm not sure what we would want exactly yet, I would suggest remove
them and keep only necessary fields for now.
##########
File path:
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -74,4 +77,30 @@
*
*/
void deletePartitions(String supervisorTaskId) throws IOException;
+
+ GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment
segment);
+
+ default String getPartitionFilePath(
+ String supervisorTaskId,
+ String subTaskId,
+ Interval interval,
+ int bucketId
+ )
+ {
+ return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId),
subTaskId).toString();
+ }
+
+ default String getPartitionDir(
Review comment:
nit: maybe `getPartitionDirPath()`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]