jihoonson commented on a change in pull request #11507:
URL: https://github.com/apache/druid/pull/11507#discussion_r678800376



##########
File path: 
extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final 
DataSegment inSegment, f
       throws IOException
   {
     final String path = OssUtils.constructSegmentPath(config.getPrefix(), 
getStorageDir(inSegment, useUniquePath));
-
     log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), 
path);
+    return pushToPath(indexFilesDir, inSegment, path);
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, 
String path) throws IOException
+  {
     final File zipOutFile = File.createTempFile("druid", "index.zip");
     final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
     final DataSegment outSegment = inSegment.withSize(indexSize)
-                                            
.withLoadSpec(makeLoadSpec(config.getBucket(), path))
-                                            
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+        .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+        .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));

Review comment:
       These changes don't seem like Druid code style. Are you using 
https://github.com/apache/druid/blob/master/dev/druid_intellij_formatting.xml?

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -53,7 +56,7 @@
    *
    * @return size of the writen segment

Review comment:
       This method no longer returns the size written. Can you fix the javadoc 
as well?

##########
File path: 
extensions-contrib/aliyun-oss-extensions/src/main/java/org/apache/druid/storage/aliyun/OssDataSegmentPusher.java
##########
@@ -78,15 +78,19 @@ public DataSegment push(final File indexFilesDir, final 
DataSegment inSegment, f
       throws IOException
   {
     final String path = OssUtils.constructSegmentPath(config.getPrefix(), 
getStorageDir(inSegment, useUniquePath));
-
     log.debug("Copying segment[%s] to OSS at location[%s]", inSegment.getId(), 
path);
+    return pushToPath(indexFilesDir, inSegment, path);
+  }
 
+  @Override
+  public DataSegment pushToPath(File indexFilesDir, DataSegment inSegment, 
String path) throws IOException
+  {
     final File zipOutFile = File.createTempFile("druid", "index.zip");
     final long indexSize = CompressionUtils.zip(indexFilesDir, zipOutFile);
 
     final DataSegment outSegment = inSegment.withSize(indexSize)
-                                            
.withLoadSpec(makeLoadSpec(config.getBucket(), path))
-                                            
.withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));
+        .withLoadSpec(makeLoadSpec(config.getBucket(), path))
+        .withBinaryVersion(SegmentUtils.getVersionFromDir(indexFilesDir));

Review comment:
       Please fix the indentation in here and other places.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/DeepStorageIntermediaryDataManager.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.worker.shuffle;
+
+import com.google.common.io.ByteSource;
+import com.google.inject.Inject;
+import org.apache.druid.indexing.common.TaskToolbox;
+import 
org.apache.druid.indexing.common.task.batch.parallel.DeepStoragePartitionStat;
+import 
org.apache.druid.indexing.common.task.batch.parallel.GenericPartitionStat;
+import org.apache.druid.segment.loading.DataSegmentPusher;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Optional;
+
+public class DeepStorageIntermediaryDataManager implements 
IntermediaryDataManager
+{
+  public static final String SHUFFLE_DATA_DIR_PREFIX = "shuffle-data";
+  private final DataSegmentPusher dataSegmentPusher;
+
+  @Override
+  public DataSegment addSegment(String supervisorTaskId, String subTaskId, 
DataSegment segment, File segmentDir)
+      throws IOException
+  {
+    final BucketNumberedShardSpec<?> bucketNumberedShardSpec = 
(BucketNumberedShardSpec<?>) segment.getShardSpec();
+    final String partitionFilePath = getPartitionFilePath(
+        supervisorTaskId,
+        subTaskId,
+        segment.getInterval(),
+        bucketNumberedShardSpec.getBucketId() // we must use the bucket ID 
instead of partition ID
+    );
+    return dataSegmentPusher.pushToPath(segmentDir, segment, 
SHUFFLE_DATA_DIR_PREFIX + "/" + partitionFilePath);
+  }
+
+  @Inject
+  public DeepStorageIntermediaryDataManager(DataSegmentPusher 
dataSegmentPusher)
+  {
+    this.dataSegmentPusher = dataSegmentPusher;
+  }
+
+  @Override
+  public GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, 
DataSegment segment)
+  {
+    return new DeepStoragePartitionStat(
+        toolbox.getTaskExecutorNode().getHost(),
+        toolbox.getTaskExecutorNode().getPortToUse(),
+        toolbox.getTaskExecutorNode().isEnableTlsPort(),
+        segment.getInterval(),
+        (BucketNumberedShardSpec) segment.getShardSpec(),
+        null, // numRows is not supported yet
+        null, // sizeBytes is not supported yet
+        segment.getLoadSpec()
+    );
+  }
+
+  @Nullable
+  @Override
+  public Optional<ByteSource> findPartitionFile(String supervisorTaskId, 
String subTaskId, Interval interval, int bucketId)
+  {
+    return Optional.empty();

Review comment:
       I guess this should be never called? It would be better to throw an 
exception then.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionLocation.java
##########
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionLocation extends GenericPartitionLocation

Review comment:
       Seems like only `subTaskId` and `loadSpec` are in use in this class 
which confuses me why this class needs other fields. I suggest to add a new 
interface for `PartitionLocation` and let `DeepStoragePartitionLocation` 
implement it instead of extending `GenericPartitionLocation` because 
`GenericPartitionLocation` is designed for local storage for shuffle. Since we 
already have an abstract class of the same name of `PartitionLocation`, you 
will have to rename it or use another name for the new interface.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/common/task/batch/parallel/DeepStoragePartitionStat.java
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.common.task.batch.parallel;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.timeline.partition.BucketNumberedShardSpec;
+import org.apache.druid.timeline.partition.BuildingShardSpec;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+public class DeepStoragePartitionStat extends GenericPartitionStat
+{
+  private final Map<String, Object> loadSpec;
+
+  @JsonCreator
+  public DeepStoragePartitionStat(
+      @JsonProperty("taskExecutorHost") String taskExecutorHost,
+      @JsonProperty("taskExecutorPort") int taskExecutorPort,
+      @JsonProperty("useHttps") boolean useHttps,

Review comment:
       Similarly, I suggest to add a new interface for `PartitionStat` and 
rename the existing abstract class of `PartitionStat` to something else. Then 
this class can implement the new `PartitionStat` interface and keep only 
necessary fields. I think we will probably never need `useHttps`. For 
`taskExecutorHost` and `taskExecutorPort`, maybe they could be useful in the 
future if we want to collect per-task shuffle metrics in the supervisor task. 
But, as I'm not sure what we would want exactly yet, I would suggest remove 
them and keep only necessary fields for now.

##########
File path: 
indexing-service/src/main/java/org/apache/druid/indexing/worker/shuffle/IntermediaryDataManager.java
##########
@@ -74,4 +77,30 @@
    *
    */
   void deletePartitions(String supervisorTaskId) throws IOException;
+
+  GenericPartitionStat generatePartitionStat(TaskToolbox toolbox, DataSegment 
segment);
+
+  default String getPartitionFilePath(
+      String supervisorTaskId,
+      String subTaskId,
+      Interval interval,
+      int bucketId
+  )
+  {
+    return Paths.get(getPartitionDir(supervisorTaskId, interval, bucketId), 
subTaskId).toString();
+  }
+
+  default String getPartitionDir(

Review comment:
       nit: maybe `getPartitionDirPath()`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to