cryptoe commented on code in PR #13368:
URL: https://github.com/apache/druid/pull/13368#discussion_r1060723306


##########
processing/src/main/java/org/apache/druid/frame/processor/DurableStorageOutputChannelFactory.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.io.CountingOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import 
org.apache.druid.frame.channel.DurableStoragePartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameFileChannel;
+import org.apache.druid.frame.file.FrameFileFooter;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.MappedByteBufferHandler;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+
+public class DurableStorageOutputChannelFactory implements OutputChannelFactory
+{
+  private static final Logger LOG = new 
Logger(DurableStorageOutputChannelFactory.class);
+
+  private final String controllerTaskId;
+  private final int workerNumber;
+  private final int stageNumber;
+  private final String taskId;
+  private final int frameSize;
+  private final StorageConnector storageConnector;
+  private final File tmpDir;
+  private final ExecutorService remoteInputStreamPool;
+
+  public DurableStorageOutputChannelFactory(
+      final String controllerTaskId,
+      final int workerNumber,
+      final int stageNumber,
+      final String taskId,
+      final int frameSize,
+      final StorageConnector storageConnector,
+      final File tmpDir
+  )
+  {
+    this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, 
"controllerTaskId");
+    this.workerNumber = workerNumber;
+    this.stageNumber = stageNumber;
+    this.taskId = taskId;
+    this.frameSize = frameSize;
+    this.storageConnector = Preconditions.checkNotNull(storageConnector, 
"storageConnector");
+    this.tmpDir = Preconditions.checkNotNull(tmpDir, "tmpDir is null");
+    this.remoteInputStreamPool =
+        
Executors.newCachedThreadPool(Execs.makeThreadFactory("-remote-fetcher-%d"));
+  }
+
+  /**
+   * Creates an instance that is the standard production implementation. 
Closeable items are registered with
+   * the provided Closer.
+   */
+  public static DurableStorageOutputChannelFactory 
createStandardImplementation(
+      final String controllerTaskId,
+      final int workerNumber,
+      final int stageNumber,
+      final String taskId,
+      final int frameSize,
+      final StorageConnector storageConnector,
+      final File tmpDir
+  )
+  {
+    return new DurableStorageOutputChannelFactory(
+        controllerTaskId,
+        workerNumber,
+        stageNumber,
+        taskId,
+        frameSize,
+        storageConnector,
+        tmpDir
+    );
+  }
+
+  @Override
+  public OutputChannel openChannel(int partitionNumber) throws IOException
+  {
+    final String fileName = 
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        partitionNumber
+    );
+    final WritableFrameFileChannel writableChannel =
+        new WritableFrameFileChannel(
+            FrameFileWriter.open(
+                Channels.newChannel(storageConnector.write(fileName)),
+                null
+            )
+        );
+
+    return OutputChannel.pair(
+        writableChannel,
+        ArenaMemoryAllocator.createOnHeap(frameSize),
+        () -> {
+          try {
+            RetryUtils.retry(() -> {
+              if (!storageConnector.pathExists(fileName)) {
+                throw new ISE("File does not exist : %s", fileName);
+              }
+              return Boolean.TRUE;
+            }, (throwable) -> true, 10);
+          }
+          catch (Exception exception) {
+            throw new RuntimeException(exception);
+          }
+          try {
+            return ReadableInputStreamFrameChannel.open(
+                storageConnector.read(fileName),
+                fileName,
+                remoteInputStreamPool,
+                false
+            );
+          }
+          catch (IOException e) {
+            throw new UncheckedIOException(StringUtils.format("Unable to read 
file : %s", fileName), e);
+          }
+        },
+        partitionNumber
+    );
+  }
+
+  @Override
+  public PartitionedOutputChannel openPartitionedChannel(String name, boolean 
deleteAfterRead) throws IOException
+  {
+    final String fileName = DurableStorageUtils.getOutputsFileNameForPath(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        name
+    );
+    final CountingOutputStream countingOutputStream = new 
CountingOutputStream(storageConnector.write(fileName));
+    final WritableFrameFileChannel writableChannel =
+        new WritableFrameFileChannel(
+            FrameFileWriter.open(
+                Channels.newChannel(countingOutputStream),
+                ByteBuffer.allocate(Frame.compressionBufferSize(frameSize))
+            )
+        );
+
+    final Supplier<Long> channelSizeSupplier = countingOutputStream::getCount;
+
+    final File footerFile = new File(tmpDir, fileName + "_footer");

Review Comment:
   So for each frame we would create a file. Won't there be a lot of files 
since each remote file can now host a lot of frames. 
   



##########
processing/src/main/java/org/apache/druid/frame/processor/DurableStorageOutputChannelFactory.java:
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.processor;
+
+import com.google.common.base.Preconditions;
+import com.google.common.base.Suppliers;
+import com.google.common.io.CountingOutputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.frame.Frame;
+import org.apache.druid.frame.allocation.ArenaMemoryAllocator;
+import 
org.apache.druid.frame.channel.DurableStoragePartitionedReadableFrameChannel;
+import org.apache.druid.frame.channel.ReadableInputStreamFrameChannel;
+import org.apache.druid.frame.channel.WritableFrameFileChannel;
+import org.apache.druid.frame.file.FrameFileFooter;
+import org.apache.druid.frame.file.FrameFileWriter;
+import org.apache.druid.frame.util.DurableStorageUtils;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.MappedByteBufferHandler;
+import org.apache.druid.java.util.common.RetryUtils;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.concurrent.Execs;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.storage.StorageConnector;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.Supplier;
+
+public class DurableStorageOutputChannelFactory implements OutputChannelFactory
+{
+  private static final Logger LOG = new 
Logger(DurableStorageOutputChannelFactory.class);
+
+  private final String controllerTaskId;
+  private final int workerNumber;
+  private final int stageNumber;
+  private final String taskId;
+  private final int frameSize;
+  private final StorageConnector storageConnector;
+  private final File tmpDir;
+  private final ExecutorService remoteInputStreamPool;
+
+  public DurableStorageOutputChannelFactory(
+      final String controllerTaskId,
+      final int workerNumber,
+      final int stageNumber,
+      final String taskId,
+      final int frameSize,
+      final StorageConnector storageConnector,
+      final File tmpDir
+  )
+  {
+    this.controllerTaskId = Preconditions.checkNotNull(controllerTaskId, 
"controllerTaskId");
+    this.workerNumber = workerNumber;
+    this.stageNumber = stageNumber;
+    this.taskId = taskId;
+    this.frameSize = frameSize;
+    this.storageConnector = Preconditions.checkNotNull(storageConnector, 
"storageConnector");
+    this.tmpDir = Preconditions.checkNotNull(tmpDir, "tmpDir is null");
+    this.remoteInputStreamPool =
+        
Executors.newCachedThreadPool(Execs.makeThreadFactory("-remote-fetcher-%d"));
+  }
+
+  /**
+   * Creates an instance that is the standard production implementation. 
Closeable items are registered with
+   * the provided Closer.
+   */
+  public static DurableStorageOutputChannelFactory 
createStandardImplementation(
+      final String controllerTaskId,
+      final int workerNumber,
+      final int stageNumber,
+      final String taskId,
+      final int frameSize,
+      final StorageConnector storageConnector,
+      final File tmpDir
+  )
+  {
+    return new DurableStorageOutputChannelFactory(
+        controllerTaskId,
+        workerNumber,
+        stageNumber,
+        taskId,
+        frameSize,
+        storageConnector,
+        tmpDir
+    );
+  }
+
+  @Override
+  public OutputChannel openChannel(int partitionNumber) throws IOException
+  {
+    final String fileName = 
DurableStorageUtils.getPartitionOutputsFileNameForPartition(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        partitionNumber
+    );
+    final WritableFrameFileChannel writableChannel =
+        new WritableFrameFileChannel(
+            FrameFileWriter.open(
+                Channels.newChannel(storageConnector.write(fileName)),
+                null
+            )
+        );
+
+    return OutputChannel.pair(
+        writableChannel,
+        ArenaMemoryAllocator.createOnHeap(frameSize),
+        () -> {
+          try {
+            RetryUtils.retry(() -> {
+              if (!storageConnector.pathExists(fileName)) {
+                throw new ISE("File does not exist : %s", fileName);
+              }
+              return Boolean.TRUE;
+            }, (throwable) -> true, 10);
+          }
+          catch (Exception exception) {
+            throw new RuntimeException(exception);
+          }
+          try {
+            return ReadableInputStreamFrameChannel.open(
+                storageConnector.read(fileName),
+                fileName,
+                remoteInputStreamPool,
+                false
+            );
+          }
+          catch (IOException e) {
+            throw new UncheckedIOException(StringUtils.format("Unable to read 
file : %s", fileName), e);
+          }
+        },
+        partitionNumber
+    );
+  }
+
+  @Override
+  public PartitionedOutputChannel openPartitionedChannel(String name, boolean 
deleteAfterRead) throws IOException
+  {
+    final String fileName = DurableStorageUtils.getOutputsFileNameForPath(
+        controllerTaskId,
+        stageNumber,
+        workerNumber,
+        taskId,
+        name
+    );
+    final CountingOutputStream countingOutputStream = new 
CountingOutputStream(storageConnector.write(fileName));
+    final WritableFrameFileChannel writableChannel =
+        new WritableFrameFileChannel(
+            FrameFileWriter.open(
+                Channels.newChannel(countingOutputStream),
+                ByteBuffer.allocate(Frame.compressionBufferSize(frameSize))
+            )
+        );
+
+    final Supplier<Long> channelSizeSupplier = countingOutputStream::getCount;
+
+    final File footerFile = new File(tmpDir, fileName + "_footer");
+    // build supplier for reader the footer of the underlying frame file

Review Comment:
   nit: reading



##########
processing/src/main/java/org/apache/druid/frame/file/FrameFileFooter.java:
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.frame.file;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.IOE;
+import org.apache.druid.java.util.common.ISE;
+
+import java.nio.ByteOrder;
+
+/**
+ * Encapsulation for ƒrame file footer related operations. The footer must be 
wrapped in a memory object (the memory
+ * can be physical or mmaped). Some verifications are also done on the footer 
to see if it is not corrupted.

Review Comment:
   nit: lets link the java doc of frameFile to this class so that we can find 
out the format of the frame footer.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to