LakshSingla commented on code in PR #15689: URL: https://github.com/apache/druid/pull/15689#discussion_r1470559070
########## sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Map; +import java.util.Objects; + +/** + * Destination that represents an ingestion to an external source. + */ +@JsonTypeName(ExportDestination.TYPE_KEY) +public class ExportDestination implements IngestDestination +{ + public static final String TYPE_KEY = "external"; + private final String destinationType; + private final Map<String, String> properties; + + public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties) + { + this.destinationType = destinationType; + this.properties = properties; + } + + @JsonProperty("destinationType") + public String getDestinationType() + { + return destinationType; + } + + @JsonProperty("properties") + public Map<String, String> getProperties() Review Comment: Can we have subtypes, like we do for something like a storage connector? We'd have `S3ExportDestination`, and we can expand it further later. I think that looks better than a generic properties map ########## sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Map; +import java.util.Objects; + +/** + * Destination that represents an ingestion to an external source. + */ +@JsonTypeName(ExportDestination.TYPE_KEY) +public class ExportDestination implements IngestDestination +{ + public static final String TYPE_KEY = "external"; + private final String destinationType; + private final Map<String, String> properties; + + public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties) + { + this.destinationType = destinationType; + this.properties = properties; + } + + @JsonProperty("destinationType") Review Comment: Can we use something more appropriate than "destinationType"? "destination" seems cleaner ########## sql/src/main/java/org/apache/druid/sql/destination/IngestDestination.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.UnstableApi; + +/** + * Represents the destination to which the ingested data is written to. Review Comment: ```suggestion * Represents the destination where the data is ingested ``` ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.results; + +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.util.SequenceUtils; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class ExportResultsFrameProcessor implements FrameProcessor<Object> +{ + private final ReadableFrameChannel inputChannel; + private final ResultFormat exportFormat; + private final FrameReader frameReader; + private final StorageConnector storageConnector; + private final ObjectMapper jsonMapper; + private final int partitionNumber; + private final int workerNumber; + private final ChannelCounters channelCounter; + + public ExportResultsFrameProcessor( + ReadableFrameChannel inputChannel, + ResultFormat exportFormat, + FrameReader frameReader, + StorageConnector storageConnector, + ObjectMapper jsonMapper, + int partitionNumber, + int workerNumber, + ChannelCounters channelCounter + ) + { + this.inputChannel = inputChannel; + this.exportFormat = exportFormat; + this.frameReader = frameReader; + this.storageConnector = storageConnector; + this.jsonMapper = jsonMapper; + this.partitionNumber = partitionNumber; + this.workerNumber = workerNumber; + this.channelCounter = channelCounter; + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException + { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } + + if (inputChannel.isFinished()) { + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + exportFrame(inputChannel.read()); + return ReturnOrAwait.awaitAll(1); + } + } + + private void exportFrame(final Frame frame) throws IOException + { + final RowSignature signature = frameReader.signature(); + + final Sequence<Cursor> cursorSequence = + new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) + .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null); + + final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat); + try (OutputStream stream = storageConnector.write(exportFilePath)) { + ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper); + + SequenceUtils.forEach( + cursorSequence, + cursor -> { + try { + formatter.writeResponseStart(); Review Comment: Should this be written per cursor level or file level? If the former, then the current code is good, however, if it's the latter then this should be written outside the .forEach(). The FrameStorageAdapter returns a single cursor, so this doesn't make a difference, however, if that were to change in the future, the code should be correct. ########## sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Map; +import java.util.Objects; + +/** + * Destination that represents an ingestion to an external source. + */ +@JsonTypeName(ExportDestination.TYPE_KEY) +public class ExportDestination implements IngestDestination +{ + public static final String TYPE_KEY = "external"; + private final String destinationType; + private final Map<String, String> properties; + + public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties) + { + this.destinationType = destinationType; + this.properties = properties; + } + + @JsonProperty("destinationType") + public String getDestinationType() + { + return destinationType; + } + + @JsonProperty("properties") + public Map<String, String> getProperties() + { + return properties; + } + + @Override + @JsonIgnore + public String getDestinationName() + { + return "EXTERN"; Review Comment: Why is there a difference between this "EXTERN" and the ExportDestination.TYPE_KEY? Let's remove this, add a `getType()` method (which will probably mean that we don't need to have a JsonIgnore on it because it'd get serialized anyways) and use the .getType() instead of `getDestinationName()`. ########## sql/src/test/java/org/apache/druid/sql/calcite/CalciteExportTest.java: ########## @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.calcite; + +import org.apache.druid.error.DruidException; +import org.apache.druid.query.Druids; +import org.apache.druid.query.scan.ScanQuery; +import org.apache.druid.segment.column.ColumnType; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.calcite.filtration.Filtration; +import org.junit.Test; + +public class CalciteExportTest extends CalciteIngestionDmlTest +{ + @Test + public void testReplaceIntoExtern() + { + testIngestionQuery() + .sql("REPLACE INTO EXTERN(s3(bucket='bucket1',prefix='prefix1',tempDir='/tempdir',chunkSize='5242880',maxRetry='1')) " + + "AS CSV " + + "OVERWRITE ALL " + + "SELECT dim2 FROM foo") + .expectQuery( + Druids.newScanQueryBuilder() + .dataSource( + "foo" + ) + .intervals(querySegmentSpec(Filtration.eternity())) + .columns("dim2") + .resultFormat(ScanQuery.ResultFormat.RESULT_FORMAT_COMPACTED_LIST) + .legacy(false) + .build() + ) + .expectResources(dataSourceRead("foo")) + .expectTarget("EXTERN", RowSignature.builder().add("dim2", ColumnType.STRING).build()) + .verify(); + } + + @Test + public void testExportWithPartitionedBy() + { + testIngestionQuery() + .sql("REPLACE INTO EXTERN(s3(bucket='bucket1',prefix='prefix1',tempDir='/tempdir',chunkSize='5242880',maxRetry='1')) " + + "AS CSV " + + "OVERWRITE ALL " + + "SELECT dim2 FROM foo " + + "PARTITIONED BY ALL") + .expectValidationError( + DruidException.class, + "Export statements do not currently support a PARTITIONED BY or CLUSTERED BY clause." Review Comment: ```suggestion "Export statements do not support a PARTITIONED BY or CLUSTERED BY clause." ``` A change in the error message ########## sql/src/main/java/org/apache/druid/sql/destination/IngestDestination.java: ########## @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.guice.annotations.UnstableApi; + +/** + * Represents the destination to which the ingested data is written to. + */ +@UnstableApi +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type") +public interface IngestDestination +{ + String getDestinationName(); Review Comment: `String getType()` (as suggested in the previous comment) ########## sql/src/main/java/org/apache/druid/sql/destination/ExportDestination.java: ########## @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.sql.destination; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; + +import java.util.Map; +import java.util.Objects; + +/** + * Destination that represents an ingestion to an external source. + */ +@JsonTypeName(ExportDestination.TYPE_KEY) +public class ExportDestination implements IngestDestination +{ + public static final String TYPE_KEY = "external"; + private final String destinationType; + private final Map<String, String> properties; + + public ExportDestination(@JsonProperty("destinationType") String destinationType, @JsonProperty("properties") Map<String, String> properties) + { + this.destinationType = destinationType; + this.properties = properties; + } + + @JsonProperty("destinationType") + public String getDestinationType() + { + return destinationType; + } + + @JsonProperty("properties") + public Map<String, String> getProperties() + { + return properties; + } + + @Override + @JsonIgnore + public String getDestinationName() + { + return "EXTERN"; Review Comment: Something like what Query<T> class does ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.results; + +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.util.SequenceUtils; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class ExportResultsFrameProcessor implements FrameProcessor<Object> +{ + private final ReadableFrameChannel inputChannel; + private final ResultFormat exportFormat; + private final FrameReader frameReader; + private final StorageConnector storageConnector; + private final ObjectMapper jsonMapper; + private final int partitionNumber; + private final int workerNumber; + private final ChannelCounters channelCounter; + + public ExportResultsFrameProcessor( + ReadableFrameChannel inputChannel, + ResultFormat exportFormat, + FrameReader frameReader, + StorageConnector storageConnector, + ObjectMapper jsonMapper, + int partitionNumber, + int workerNumber, + ChannelCounters channelCounter + ) + { + this.inputChannel = inputChannel; + this.exportFormat = exportFormat; + this.frameReader = frameReader; + this.storageConnector = storageConnector; + this.jsonMapper = jsonMapper; + this.partitionNumber = partitionNumber; + this.workerNumber = workerNumber; + this.channelCounter = channelCounter; + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException + { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } + + if (inputChannel.isFinished()) { + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + exportFrame(inputChannel.read()); + return ReturnOrAwait.awaitAll(1); + } + } + + private void exportFrame(final Frame frame) throws IOException + { + final RowSignature signature = frameReader.signature(); + + final Sequence<Cursor> cursorSequence = + new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) + .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null); + + final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat); Review Comment: This can be moved outside into the constructor, and needn't be called each time ########## extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java: ########## @@ -0,0 +1,172 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.msq.querykit.results; + +import com.fasterxml.jackson.databind.ObjectMapper; +import it.unimi.dsi.fastutil.ints.IntSet; +import org.apache.druid.error.DruidException; +import org.apache.druid.frame.Frame; +import org.apache.druid.frame.channel.ReadableFrameChannel; +import org.apache.druid.frame.channel.WritableFrameChannel; +import org.apache.druid.frame.processor.FrameProcessor; +import org.apache.druid.frame.processor.FrameProcessors; +import org.apache.druid.frame.processor.ReturnOrAwait; +import org.apache.druid.frame.read.FrameReader; +import org.apache.druid.frame.segment.FrameStorageAdapter; +import org.apache.druid.java.util.common.Intervals; +import org.apache.druid.java.util.common.StringUtils; +import org.apache.druid.java.util.common.Unit; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.msq.counters.ChannelCounters; +import org.apache.druid.msq.util.SequenceUtils; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.RowSignature; +import org.apache.druid.sql.http.ResultFormat; +import org.apache.druid.storage.StorageConnector; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; + +public class ExportResultsFrameProcessor implements FrameProcessor<Object> +{ + private final ReadableFrameChannel inputChannel; + private final ResultFormat exportFormat; + private final FrameReader frameReader; + private final StorageConnector storageConnector; + private final ObjectMapper jsonMapper; + private final int partitionNumber; + private final int workerNumber; + private final ChannelCounters channelCounter; + + public ExportResultsFrameProcessor( + ReadableFrameChannel inputChannel, + ResultFormat exportFormat, + FrameReader frameReader, + StorageConnector storageConnector, + ObjectMapper jsonMapper, + int partitionNumber, + int workerNumber, + ChannelCounters channelCounter + ) + { + this.inputChannel = inputChannel; + this.exportFormat = exportFormat; + this.frameReader = frameReader; + this.storageConnector = storageConnector; + this.jsonMapper = jsonMapper; + this.partitionNumber = partitionNumber; + this.workerNumber = workerNumber; + this.channelCounter = channelCounter; + } + + @Override + public List<ReadableFrameChannel> inputChannels() + { + return Collections.singletonList(inputChannel); + } + + @Override + public List<WritableFrameChannel> outputChannels() + { + return Collections.emptyList(); + } + + @Override + public ReturnOrAwait<Object> runIncrementally(IntSet readableInputs) throws IOException + { + if (readableInputs.isEmpty()) { + return ReturnOrAwait.awaitAll(1); + } + + if (inputChannel.isFinished()) { + return ReturnOrAwait.returnObject(Unit.instance()); + } else { + exportFrame(inputChannel.read()); + return ReturnOrAwait.awaitAll(1); + } + } + + private void exportFrame(final Frame frame) throws IOException + { + final RowSignature signature = frameReader.signature(); + + final Sequence<Cursor> cursorSequence = + new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY) + .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, Granularities.ALL, false, null); + + final String exportFilePath = getExportFilePath(workerNumber, partitionNumber, exportFormat); + try (OutputStream stream = storageConnector.write(exportFilePath)) { + ResultFormat.Writer formatter = exportFormat.createFormatter(stream, jsonMapper); + + SequenceUtils.forEach( + cursorSequence, + cursor -> { + try { + formatter.writeResponseStart(); Review Comment: There's a `writeHeader()` field in the `formatter`'s class. Is that call not required? Perhaps the user would like the CSV to be populated with the row names in the header. Perhaps that can be a user-toggled input as well, something like what CSVParser does (hasHeaderRow). We can have a field like `writeHeaderRow`. This seems useful, particularly for CSV. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
