This is an automated email from the ASF dual-hosted git repository.
karan pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push:
new 36bfd2b2e2f Maintain a connection while exporting results with MSQ
(#16381) (#16414)
36bfd2b2e2f is described below
commit 36bfd2b2e2fb45c6144f0efcb027ef2ff62963bd
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Thu May 9 17:47:09 2024 +0530
Maintain a connection while exporting results with MSQ (#16381) (#16414)
* Maintain a connection while exporting results with MSQ
* Fix checkstyle
* Fix checkstyle
* Move initialization from constructor
* Add null check
* Address review comments
---
.../results/ExportResultsFrameProcessor.java | 108 ++++++++++++---------
.../org/apache/druid/storage/StorageConnector.java | 7 +-
.../storage/local/LocalFileStorageConnector.java | 2 +-
3 files changed, 68 insertions(+), 49 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
index 56b287781c2..e3635338231 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
@@ -61,10 +61,12 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
private final StorageConnector storageConnector;
private final ObjectMapper jsonMapper;
private final ChannelCounters channelCounter;
- final String exportFilePath;
+ private final String exportFilePath;
private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
private final RowSignature exportRowSignature;
+ private volatile ResultFormat.Writer exportWriter;
+
public ExportResultsFrameProcessor(
final ReadableFrameChannel inputChannel,
final ResultFormat exportFormat,
@@ -129,65 +131,77 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
}
if (inputChannel.isFinished()) {
+ exportWriter.writeResponseEnd();
return ReturnOrAwait.returnObject(exportFilePath);
} else {
+ if (exportWriter == null) {
+ createExportWriter();
+ }
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
}
- private void exportFrame(final Frame frame) throws IOException
+ private void exportFrame(final Frame frame)
{
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY,
Granularities.ALL, false, null);
- // Add headers if we are writing to a new file.
- final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
- try (OutputStream stream = storageConnector.write(exportFilePath)) {
- ResultFormat.Writer formatter = exportFormat.createFormatter(stream,
jsonMapper);
- formatter.writeResponseStart();
-
- if (writeHeader) {
- formatter.writeHeaderFromRowSignature(exportRowSignature, false);
- }
-
- SequenceUtils.forEach(
- cursorSequence,
- cursor -> {
- try {
- final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
-
- //noinspection rawtypes
- @SuppressWarnings("rawtypes")
- final List<BaseObjectColumnValueSelector> selectors =
- frameReader.signature()
- .getColumnNames()
- .stream()
-
.map(columnSelectorFactory::makeColumnValueSelector)
- .collect(Collectors.toList());
-
- while (!cursor.isDone()) {
- formatter.writeRowStart();
- for (int j = 0; j < exportRowSignature.size(); j++) {
- String columnName = exportRowSignature.getColumnName(j);
- BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
- formatter.writeRowField(columnName, selector.getObject());
- }
- channelCounter.incrementRowCount();
- formatter.writeRowEnd();
- cursor.advance();
+ SequenceUtils.forEach(
+ cursorSequence,
+ cursor -> {
+ try {
+ final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
+
+ //noinspection rawtypes
+ final List<BaseObjectColumnValueSelector> selectors =
+ frameReader.signature()
+ .getColumnNames()
+ .stream()
+ .map(columnSelectorFactory::makeColumnValueSelector)
+ .collect(Collectors.toList());
+
+ while (!cursor.isDone()) {
+ exportWriter.writeRowStart();
+ for (int j = 0; j < exportRowSignature.size(); j++) {
+ String columnName = exportRowSignature.getColumnName(j);
+ BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+ exportWriter.writeRowField(columnName, selector.getObject());
}
- }
- catch (IOException e) {
- throw DruidException.forPersona(DruidException.Persona.USER)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ channelCounter.incrementRowCount();
+ exportWriter.writeRowEnd();
+ cursor.advance();
}
}
- );
- formatter.writeResponseEnd();
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ }
+ }
+ );
+ }
+
+ private void createExportWriter() throws IOException
+ {
+ OutputStream stream = null;
+ try {
+ stream = storageConnector.write(exportFilePath);
+ exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+ exportWriter.writeResponseStart();
+ exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+ }
+ catch (IOException e) {
+ if (exportWriter != null) {
+ exportWriter.close();
+ }
+ if (stream != null) {
+ stream.close();
+ }
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while opening a stream
to the export location [%s].", exportFilePath);
}
}
@@ -195,5 +209,9 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
public void cleanup() throws IOException
{
FrameProcessors.closeAll(inputChannels(), outputChannels());
+
+ if (exportWriter != null) {
+ exportWriter.close();
+ }
}
}
diff --git
a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
index 3d1ead89b1e..d99d9469f08 100644
--- a/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
+++ b/processing/src/main/java/org/apache/druid/storage/StorageConnector.java
@@ -90,12 +90,13 @@ public interface StorageConnector
/**
* Open an {@link OutputStream} for writing data to the path in the
underlying storage system.
- * Most implementations prepend the input path with a basePath.
+ * Most implementations prepend the input path with a basePath. If an object
exists at the path,
+ * the existing object will be overwritten by the write operation.
* Callers are adivised to namespace there files as there might be race
conditions.
* The caller should take care of closing the stream when done or in case of
error.
*
- * @param path
- * @return
+ * @param path to write
+ * @return OutputStream to the path
* @throws IOException
*/
OutputStream write(String path) throws IOException;
diff --git
a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
index 225f3eb8537..3d96f8d43b1 100644
---
a/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
+++
b/processing/src/main/java/org/apache/druid/storage/local/LocalFileStorageConnector.java
@@ -101,7 +101,7 @@ public class LocalFileStorageConnector implements
StorageConnector
{
File toWrite = fileWithBasePath(path);
FileUtils.mkdirp(toWrite.getParentFile());
- return Files.newOutputStream(toWrite.toPath(), StandardOpenOption.CREATE,
StandardOpenOption.APPEND);
+ return Files.newOutputStream(toWrite.toPath());
}
/**
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]