This is an automated email from the ASF dual-hosted git repository.

karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new 36bfd2b2e2f Maintain a connection while exporting results with MSQ 
(#16381) (#16414)
36bfd2b2e2f is described below

commit 36bfd2b2e2fb45c6144f0efcb027ef2ff62963bd
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu May 9 17:47:09 2024 +0530

    Maintain a connection while exporting results with MSQ (#16381) (#16414)
    
    * Maintain a connection while exporting results with MSQ
    
    * Fix checkstyle
    
    * Fix checkstyle
    
    * Move initialization from constructor
    
    * Add null check
    
    * Address review comments
---
 .../results/ExportResultsFrameProcessor.java       | 108 ++++++++++++---------
 .../org/apache/druid/storage/StorageConnector.java |   7 +-
 .../storage/local/LocalFileStorageConnector.java   |   2 +-
 3 files changed, 68 insertions(+), 49 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
index 56b287781c2..e3635338231 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
@@ -61,10 +61,12 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
   private final StorageConnector storageConnector;
   private final ObjectMapper jsonMapper;
   private final ChannelCounters channelCounter;
-  final String exportFilePath;
+  private final String exportFilePath;
   private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
   private final RowSignature exportRowSignature;
 
+  private volatile ResultFormat.Writer exportWriter;
+
   public ExportResultsFrameProcessor(
       final ReadableFrameChannel inputChannel,
       final ResultFormat exportFormat,
@@ -129,65 +131,77 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
     }
 
     if (inputChannel.isFinished()) {
+      exportWriter.writeResponseEnd();
       return ReturnOrAwait.returnObject(exportFilePath);
     } else {
+      if (exportWriter == null) {
+        createExportWriter();
+      }
       exportFrame(inputChannel.read());
       return ReturnOrAwait.awaitAll(1);
     }
   }
 
-  private void exportFrame(final Frame frame) throws IOException
+  private void exportFrame(final Frame frame)
   {
     final Sequence<Cursor> cursorSequence =
         new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
             .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, 
Granularities.ALL, false, null);
 
-    // Add headers if we are writing to a new file.
-    final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
-    try (OutputStream stream = storageConnector.write(exportFilePath)) {
-      ResultFormat.Writer formatter = exportFormat.createFormatter(stream, 
jsonMapper);
-      formatter.writeResponseStart();
-
-      if (writeHeader) {
-        formatter.writeHeaderFromRowSignature(exportRowSignature, false);
-      }
-
-      SequenceUtils.forEach(
-          cursorSequence,
-          cursor -> {
-            try {
-              final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
-
-              //noinspection rawtypes
-              @SuppressWarnings("rawtypes")
-              final List<BaseObjectColumnValueSelector> selectors =
-                  frameReader.signature()
-                             .getColumnNames()
-                             .stream()
-                             
.map(columnSelectorFactory::makeColumnValueSelector)
-                             .collect(Collectors.toList());
-
-              while (!cursor.isDone()) {
-                formatter.writeRowStart();
-                for (int j = 0; j < exportRowSignature.size(); j++) {
-                  String columnName = exportRowSignature.getColumnName(j);
-                  BaseObjectColumnValueSelector<?> selector = 
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
-                  formatter.writeRowField(columnName, selector.getObject());
-                }
-                channelCounter.incrementRowCount();
-                formatter.writeRowEnd();
-                cursor.advance();
+    SequenceUtils.forEach(
+        cursorSequence,
+        cursor -> {
+          try {
+            final ColumnSelectorFactory columnSelectorFactory = 
cursor.getColumnSelectorFactory();
+
+            //noinspection rawtypes
+            final List<BaseObjectColumnValueSelector> selectors =
+                frameReader.signature()
+                           .getColumnNames()
+                           .stream()
+                           .map(columnSelectorFactory::makeColumnValueSelector)
+                           .collect(Collectors.toList());
+
+            while (!cursor.isDone()) {
+              exportWriter.writeRowStart();
+              for (int j = 0; j < exportRowSignature.size(); j++) {
+                String columnName = exportRowSignature.getColumnName(j);
+                BaseObjectColumnValueSelector<?> selector = 
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+                exportWriter.writeRowField(columnName, selector.getObject());
               }
-            }
-            catch (IOException e) {
-              throw DruidException.forPersona(DruidException.Persona.USER)
-                                  
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
-                                  .build(e, "Exception occurred while writing 
file to the export location [%s].", exportFilePath);
+              channelCounter.incrementRowCount();
+              exportWriter.writeRowEnd();
+              cursor.advance();
             }
           }
-      );
-      formatter.writeResponseEnd();
+          catch (IOException e) {
+            throw DruidException.forPersona(DruidException.Persona.USER)
+                                
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                                .build(e, "Exception occurred while writing 
file to the export location [%s].", exportFilePath);
+          }
+        }
+    );
+  }
+
+  private void createExportWriter() throws IOException
+  {
+    OutputStream stream = null;
+    try {
+      stream = storageConnector.write(exportFilePath);
+      exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+      exportWriter.writeResponseStart();
+      exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+    }
+    catch (IOException e) {
+      if (exportWriter != null) {
+        exportWriter.close();
+      }
+      if (stream != null) {
+        stream.close();
+      }
+      throw DruidException.forPersona(DruidException.Persona.USER)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build(e, "Exception occurred while opening a stream 
to the export location [%s].", exportFilePath);
     }
   }
 
@@ -195,5 +209,9 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
   public void cleanup() throws IOException
   {
     FrameProcessors.closeAll(inputChannels(), outputChannels());
+
+    if (exportWriter != null) {
+      exportWriter.close();
+    }
   }
 }
diff --git 
a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java 
b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
index 3d1ead89b1e..d99d9469f08 100644
--- a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
+++ b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
@@ -90,12 +90,13 @@ public interface StorageConnector
 
   /**
    * Open an {@link OutputStream} for writing data to the path in the 
underlying storage system.
-   * Most implementations prepend the input path with a basePath.
+   * Most implementations prepend the input path with a basePath. If an object 
exists at the path,
+   * the existing object will be overwritten by the write operation.
    * Callers are adivised to namespace there files as there might be race 
conditions.
    * The caller should take care of closing the stream when done or in case of 
error.
    *
-   * @param path
-   * @return
+   * @param path to write
+   * @return OutputStream to the path
    * @throws IOException
    */
   OutputStream write(String path) throws IOException;
diff --git 
a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
 
b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
index 225f3eb8537..3d96f8d43b1 100644
--- 
a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
+++ 
b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
@@ -101,7 +101,7 @@ public class LocalFileStorageConnector implements 
StorageConnector
   {
     File toWrite = fileWithBasePath(path);
     FileUtils.mkdirp(toWrite.getParentFile());
-    return Files.newOutputStream(toWrite.toPath(), StandardOpenOption.CREATE, 
StandardOpenOption.APPEND);
+    return Files.newOutputStream(toWrite.toPath());
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to