cryptoe commented on code in PR #16381:
URL: https://github.com/apache/druid/pull/16381#discussion_r1590573340
##########
extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java:
##########
@@ -131,69 +133,76 @@ public ReturnOrAwait<Object> runIncrementally(IntSet
readableInputs) throws IOEx
if (inputChannel.isFinished()) {
return ReturnOrAwait.returnObject(exportFilePath);
} else {
+ if (exportWriter == null) {
+ createExportWriter();
+ }
exportFrame(inputChannel.read());
return ReturnOrAwait.awaitAll(1);
}
}
- private void exportFrame(final Frame frame) throws IOException
+ private void exportFrame(final Frame frame)
{
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY,
Granularities.ALL, false, null);
- // Add headers if we are writing to a new file.
- final boolean writeHeader = !storageConnector.pathExists(exportFilePath);
-
- try (OutputStream stream = storageConnector.write(exportFilePath)) {
- ResultFormat.Writer formatter = exportFormat.createFormatter(stream,
jsonMapper);
- formatter.writeResponseStart();
-
- if (writeHeader) {
- formatter.writeHeaderFromRowSignature(exportRowSignature, false);
- }
-
- SequenceUtils.forEach(
- cursorSequence,
- cursor -> {
- try {
- final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
-
- //noinspection rawtypes
- @SuppressWarnings("rawtypes")
- final List<BaseObjectColumnValueSelector> selectors =
- frameReader.signature()
- .getColumnNames()
- .stream()
-
.map(columnSelectorFactory::makeColumnValueSelector)
- .collect(Collectors.toList());
-
- while (!cursor.isDone()) {
- formatter.writeRowStart();
- for (int j = 0; j < exportRowSignature.size(); j++) {
- String columnName = exportRowSignature.getColumnName(j);
- BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
- formatter.writeRowField(columnName, selector.getObject());
- }
- channelCounter.incrementRowCount();
- formatter.writeRowEnd();
- cursor.advance();
+ SequenceUtils.forEach(
+ cursorSequence,
+ cursor -> {
+ try {
+ final ColumnSelectorFactory columnSelectorFactory =
cursor.getColumnSelectorFactory();
+
+ //noinspection rawtypes
+ final List<BaseObjectColumnValueSelector> selectors =
+ frameReader.signature()
+ .getColumnNames()
+ .stream()
+ .map(columnSelectorFactory::makeColumnValueSelector)
+ .collect(Collectors.toList());
+
+ while (!cursor.isDone()) {
+ exportWriter.writeRowStart();
+ for (int j = 0; j < exportRowSignature.size(); j++) {
+ String columnName = exportRowSignature.getColumnName(j);
+ BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+ exportWriter.writeRowField(columnName, selector.getObject());
}
- }
- catch (IOException e) {
- throw DruidException.forPersona(DruidException.Persona.USER)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ channelCounter.incrementRowCount();
+ exportWriter.writeRowEnd();
+ cursor.advance();
}
}
- );
- formatter.writeResponseEnd();
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while writing
file to the export location [%s].", exportFilePath);
+ }
+ }
+ );
+ }
+
+ private void createExportWriter()
+ {
+ try {
+ OutputStream stream = storageConnector.write(exportFilePath);
+ exportWriter = exportFormat.createFormatter(stream, jsonMapper);
+ exportWriter.writeResponseStart();
+ exportWriter.writeHeaderFromRowSignature(exportRowSignature, false);
+ }
+ catch (IOException e) {
+ throw DruidException.forPersona(DruidException.Persona.USER)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(e, "Exception occurred while opening a stream
to the export location [%s].", exportFilePath);
}
}
@Override
public void cleanup() throws IOException
{
FrameProcessors.closeAll(inputChannels(), outputChannels());
+
Review Comment:
Aa the problem is with the s3 connector so I guess we cannot do a UT.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]