cryptoe commented on code in PR #16381:
URL: https://github.com/apache/druid/pull/16381#discussion_r1590816078
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -131,69 +133,78 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs) throws IOEx
if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(exportFilePath);
} else {
+ if (exportWriter == null) {
+ createExportWriter();
+ }
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
}
- private void exportFrame(final Frame frame) throws IOException
+ private void exportFrame(final Frame frame)
{
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY,
Granularities.ALL, false, null);
- // Add headers if we are writing to a new file.
- final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
- try (OutputStream stream = storageConnector.write(exportFilePath)) {
- ResultFormat.Writer formatter = exportFormat.createFormatter(stream,
jsonMapper);
- formatter.writeResponseStart();
-
- if (writeHeader) {
- formatter.writeHeaderFromRowSignature(exportRowSignature, false);
- }
-
- SequenceUtils.forEach(
- cursorSequence,
- cursor -> {
- try {
- final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
-
- //noinspection rawtypes
- @SuppressWarnings("rawtypes")
- final List<BaseObjectColumnValueSelector> selectors =
- frameReader.signature()
- .getColumnNames()
- .stream()
-
.map(columnSelectorFactory::makeColumnValueSelector)
- .collect(Collectors.toList());
-
- while (!cursor.isDone()) {
- formatter.writeRowStart();
- for (int j = 0; j < exportRowSignature.size(); j++) {
- String columnName = exportRowSignature.getColumnName(j);
- BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
- formatter.writeRowField(columnName, selector.getObject());
- }
- channelCounter.incrementRowCount();
- formatter.writeRowEnd();
- cursor.advance();
+ SequenceUtils.forEach(
+ cursorSequence,
+ cursor -> {
+ try {
+ final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
+
+ //noinspection rawtypes
+ final List<BaseObjectColumnValueSelector> selectors =
+ frameReader.signature()
+ .getColumnNames()
+ .stream()
+ .map(columnSelectorFactory::makeColumnValueSelector)
+ .collect(Collectors.toList());
+
+ while (!cursor.isDone()) {
+ exportWriter.writeRowStart();
+ for (int j = 0; j < exportRowSignature.size(); j++) {
+ String columnName = exportRowSignature.getColumnName(j);
+ BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+ exportWriter.writeRowField(columnName, selector.getObject());
}
- }
- catch (IOException e) {
- throw DruidException.forPersona(DruidException.Persona.USER)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ channelCounter.incrementRowCount();
+ exportWriter.writeRowEnd();
+ cursor.advance();
}
}
- );
- formatter.writeResponseEnd();
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ }
+ }
+ );
+ }
+
+ private void createExportWriter()
+ {
+ try {
+ OutputStream stream = storageConnector.write(exportFilePath);
+ exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+ exportWriter.writeResponseStart();
+ exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+ }
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
Review Comment:
Shouldn't we close the stream and the writer in case of exception in case of
exception ?
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -131,69 +133,78 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs) throws IOEx
if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(exportFilePath);
} else {
+ if (exportWriter == null) {
+ createExportWriter();
+ }
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
}
- private void exportFrame(final Frame frame) throws IOException
+ private void exportFrame(final Frame frame)
{
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY,
Granularities.ALL, false, null);
- // Add headers if we are writing to a new file.
- final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
- try (OutputStream stream = storageConnector.write(exportFilePath)) {
- ResultFormat.Writer formatter = exportFormat.createFormatter(stream,
jsonMapper);
- formatter.writeResponseStart();
-
- if (writeHeader) {
- formatter.writeHeaderFromRowSignature(exportRowSignature, false);
- }
-
- SequenceUtils.forEach(
- cursorSequence,
- cursor -> {
- try {
- final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
-
- //noinspection rawtypes
- @SuppressWarnings("rawtypes")
- final List<BaseObjectColumnValueSelector> selectors =
- frameReader.signature()
- .getColumnNames()
- .stream()
-
.map(columnSelectorFactory::makeColumnValueSelector)
- .collect(Collectors.toList());
-
- while (!cursor.isDone()) {
- formatter.writeRowStart();
- for (int j = 0; j < exportRowSignature.size(); j++) {
- String columnName = exportRowSignature.getColumnName(j);
- BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
- formatter.writeRowField(columnName, selector.getObject());
- }
- channelCounter.incrementRowCount();
- formatter.writeRowEnd();
- cursor.advance();
+ SequenceUtils.forEach(
+ cursorSequence,
+ cursor -> {
+ try {
+ final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
+
+ //noinspection rawtypes
+ final List<BaseObjectColumnValueSelector> selectors =
+ frameReader.signature()
+ .getColumnNames()
+ .stream()
+ .map(columnSelectorFactory::makeColumnValueSelector)
+ .collect(Collectors.toList());
+
+ while (!cursor.isDone()) {
+ exportWriter.writeRowStart();
+ for (int j = 0; j < exportRowSignature.size(); j++) {
+ String columnName = exportRowSignature.getColumnName(j);
+ BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+ exportWriter.writeRowField(columnName, selector.getObject());
}
- }
- catch (IOException e) {
- throw DruidException.forPersona(DruidException.Persona.USER)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ channelCounter.incrementRowCount();
+ exportWriter.writeRowEnd();
+ cursor.advance();
}
}
- );
- formatter.writeResponseEnd();
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ }
+ }
+ );
+ }
+
+ private void createExportWriter()
+ {
+ try {
+ OutputStream stream = storageConnector.write(exportFilePath);
+ exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+ exportWriter.writeResponseStart();
+ exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+ }
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
Review Comment:
Also exportWriter might need to be volatile since it is shared across
multiple threads no ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]