This is an automated email from the ASF dual-hosted git repository.
adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a151bcfd12e Fix incorrect header names for certain export queries
(#16096)
a151bcfd12e is described below
commit a151bcfd12e86b3ace7a9776cb7bc1e87ea2a017
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Mar 19 15:11:04 2024 +0530
Fix incorrect header names for certain export queries (#16096)
* Fix incorrect header names for certain queries
* Fix incorrect header names for certain queries
* Maintain upgrade compatibility
* Fix tests
* Change null handling
---
.../org/apache/druid/msq/exec/ControllerImpl.java | 3 +-
.../results/ExportResultsFrameProcessor.java | 52 +++++++---
.../ExportResultsFrameProcessorFactory.java | 18 +++-
.../org/apache/druid/msq/exec/MSQExportTest.java | 108 +++++++++++++++------
.../ExportResultsFrameProcessorFactoryTest.java | 52 ++++++++++
5 files changed, 183 insertions(+), 50 deletions(-)
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c71941f5c07..e9d71239940 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1905,7 +1905,8 @@ public class ControllerImpl implements Controller
.processorFactory(new
ExportResultsFrameProcessorFactory(
queryId,
exportStorageProvider,
- resultFormat
+ resultFormat,
+ columnMappings
))
);
return builder.build();
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
index de65d3e9d7a..52697578b07 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
@@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.results;
import com.fasterxml.jackson.databind.ObjectMapper;
import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.error.DruidException;
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -35,13 +37,14 @@ import org.apache.druid.java.util.common.Unit;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.msq.counters.ChannelCounters;
-import org.apache.druid.msq.querykit.QueryKitUtils;
import org.apache.druid.msq.util.SequenceUtils;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.StorageConnector;
@@ -60,6 +63,8 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
private final ObjectMapper jsonMapper;
private final ChannelCounters channelCounter;
final String exportFilePath;
+ private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
+ private final RowSignature exportRowSignature;
public ExportResultsFrameProcessor(
final ReadableFrameChannel inputChannel,
@@ -68,7 +73,8 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
final StorageConnector storageConnector,
final ObjectMapper jsonMapper,
final ChannelCounters channelCounter,
- final String exportFilePath
+ final String exportFilePath,
+ final ColumnMappings columnMappings
)
{
this.inputChannel = inputChannel;
@@ -78,6 +84,30 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
this.jsonMapper = jsonMapper;
this.channelCounter = channelCounter;
this.exportFilePath = exportFilePath;
+ this.outputColumnNameToFrameColumnNumberMap = new
Object2IntOpenHashMap<>();
+ final RowSignature inputRowSignature = frameReader.signature();
+
+ if (columnMappings == null) {
+ // If the column mappings wasn't sent, fail the query to avoid
inconsistency in file format.
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+ .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build("Received null columnMappings from
controller. This might be due to an upgrade.");
+ }
+ for (final ColumnMapping columnMapping : columnMappings.getMappings()) {
+ this.outputColumnNameToFrameColumnNumberMap.put(
+ columnMapping.getOutputColumn(),
+ frameReader.signature().indexOf(columnMapping.getQueryColumn())
+ );
+ }
+ final RowSignature.Builder exportRowSignatureBuilder =
RowSignature.builder();
+
+ for (String outputColumn : columnMappings.getOutputColumnNames()) {
+ exportRowSignatureBuilder.add(
+ outputColumn,
+
inputRowSignature.getColumnType(outputColumnNameToFrameColumnNumberMap.getInt(outputColumn)).orElse(null)
+ );
+ }
+ this.exportRowSignature = exportRowSignatureBuilder.build();
}
@Override
@@ -109,8 +139,6 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
private void exportFrame(final Frame frame) throws IOException
{
- final RowSignature exportRowSignature =
createRowSignatureForExport(frameReader.signature());
-
final Sequence<Cursor> cursorSequence =
new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
.makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY,
Granularities.ALL, false, null);
@@ -135,7 +163,7 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
//noinspection rawtypes
@SuppressWarnings("rawtypes")
final List<BaseObjectColumnValueSelector> selectors =
- exportRowSignature
+ frameReader.signature()
.getColumnNames()
.stream()
.map(columnSelectorFactory::makeColumnValueSelector)
@@ -144,7 +172,9 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
while (!cursor.isDone()) {
formatter.writeRowStart();
for (int j = 0; j < exportRowSignature.size(); j++) {
- formatter.writeRowField(exportRowSignature.getColumnName(j),
selectors.get(j).getObject());
+ String columnName = exportRowSignature.getColumnName(j);
+ BaseObjectColumnValueSelector<?> selector =
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+ formatter.writeRowField(columnName, selector.getObject());
}
channelCounter.incrementRowCount();
formatter.writeRowEnd();
@@ -162,16 +192,6 @@ public class ExportResultsFrameProcessor implements
FrameProcessor<Object>
}
}
- private static RowSignature createRowSignatureForExport(RowSignature
inputRowSignature)
- {
- RowSignature.Builder exportRowSignatureBuilder = RowSignature.builder();
- inputRowSignature.getColumnNames()
- .stream()
- .filter(name ->
!QueryKitUtils.PARTITION_BOOST_COLUMN.equals(name))
- .forEach(name -> exportRowSignatureBuilder.add(name,
inputRowSignature.getColumnType(name).orElse(null)));
- return exportRowSignatureBuilder.build();
- }
-
@Override
public void cleanup() throws IOException
{
diff --git
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
index c9f9b6a40a8..5fe9b52191c 100644
---
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
+++
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
@@ -20,6 +20,7 @@
package org.apache.druid.msq.querykit.results;
import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonTypeName;
import org.apache.druid.error.DruidException;
@@ -41,6 +42,7 @@ import org.apache.druid.msq.kernel.FrameContext;
import org.apache.druid.msq.kernel.ProcessorsAndChannels;
import org.apache.druid.msq.kernel.StageDefinition;
import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
import org.apache.druid.sql.http.ResultFormat;
import org.apache.druid.storage.ExportStorageProvider;
import org.apache.druid.utils.CollectionUtils;
@@ -55,17 +57,20 @@ public class ExportResultsFrameProcessorFactory extends
BaseFrameProcessorFactor
private final String queryId;
private final ExportStorageProvider exportStorageProvider;
private final ResultFormat exportFormat;
+ private final ColumnMappings columnMappings;
@JsonCreator
public ExportResultsFrameProcessorFactory(
@JsonProperty("queryId") String queryId,
@JsonProperty("exportStorageProvider") ExportStorageProvider
exportStorageProvider,
- @JsonProperty("exportFormat") ResultFormat exportFormat
+ @JsonProperty("exportFormat") ResultFormat exportFormat,
+ @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings
)
{
this.queryId = queryId;
this.exportStorageProvider = exportStorageProvider;
this.exportFormat = exportFormat;
+ this.columnMappings = columnMappings;
}
@JsonProperty("queryId")
@@ -87,6 +92,14 @@ public class ExportResultsFrameProcessorFactory extends
BaseFrameProcessorFactor
return exportStorageProvider;
}
+ @JsonProperty("columnMappings")
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public ColumnMappings getColumnMappings()
+ {
+ return columnMappings;
+ }
+
@Override
public ProcessorsAndChannels<Object, Long> makeProcessors(
StageDefinition stageDefinition,
@@ -122,7 +135,8 @@ public class ExportResultsFrameProcessorFactory extends
BaseFrameProcessorFactor
exportStorageProvider.get(),
frameContext.jsonMapper(),
channelCounter,
- getExportFilePath(queryId, workerNumber,
readableInput.getStagePartition().getPartitionNumber(), exportFormat)
+ getExportFilePath(queryId, workerNumber,
readableInput.getStagePartition().getPartitionNumber(), exportFormat),
+ columnMappings
)
);
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
index e6c3b5e2931..4619335bb87 100644
---
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
@@ -26,15 +26,14 @@ import org.apache.druid.msq.test.MSQTestBase;
import org.apache.druid.msq.util.MultiStageQueryContext;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
-import org.apache.druid.sql.http.ResultFormat;
import org.junit.Assert;
import org.junit.Test;
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
-import java.nio.charset.Charset;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -46,14 +45,13 @@ public class MSQExportTest extends MSQTestBase
@Test
public void testExport() throws IOException
{
- TestExportStorageConnector storageConnector = (TestExportStorageConnector)
exportStorageConnectorProvider.get();
-
RowSignature rowSignature = RowSignature.builder()
.add("__time", ColumnType.LONG)
.add("dim1", ColumnType.STRING)
.add("cnt",
ColumnType.LONG).build();
- final String sql = StringUtils.format("insert into extern(%s()) as csv
select cnt, dim1 from foo", TestExportStorageConnector.TYPE_NAME);
+ File exportDir = temporaryFolder.newFolder("export/");
+ final String sql = StringUtils.format("insert into
extern(local(exportPath=>'%s')) as csv select cnt, dim1 as dim from foo",
exportDir.getAbsolutePath());
testIngestQuery().setSql(sql)
.setExpectedDataSource("foo1")
@@ -63,11 +61,47 @@ public class MSQExportTest extends MSQTestBase
.setExpectedResultRows(ImmutableList.of())
.verifyResults();
- List<Object[]> objects = expectedFooFileContents();
+ Assert.assertEquals(
+ 1,
+ Objects.requireNonNull(new
File(exportDir.getAbsolutePath()).listFiles()).length
+ );
+ File resultFile = new File(exportDir,
"query-test-query-worker0-partition0.csv");
+ List<String> results = readResultsFromFile(resultFile);
Assert.assertEquals(
- convertResultsToString(objects),
- new String(storageConnector.getByteArrayOutputStream().toByteArray(),
Charset.defaultCharset())
+ expectedFooFileContents(true),
+ results
+ );
+ }
+
+ @Test
+ public void testExport2() throws IOException
+ {
+ RowSignature rowSignature = RowSignature.builder()
+ .add("dim1", ColumnType.STRING)
+ .add("cnt",
ColumnType.LONG).build();
+
+ File exportDir = temporaryFolder.newFolder("export/");
+ final String sql = StringUtils.format("insert into
extern(local(exportPath=>'%s')) as csv select dim1 as table_dim, count(*) as
table_count from foo where dim1 = 'abc' group by 1",
exportDir.getAbsolutePath());
+
+ testIngestQuery().setSql(sql)
+ .setExpectedDataSource("foo1")
+ .setQueryContext(DEFAULT_MSQ_CONTEXT)
+ .setExpectedRowSignature(rowSignature)
+ .setExpectedSegment(ImmutableSet.of())
+ .setExpectedResultRows(ImmutableList.of())
+ .verifyResults();
+
+ Assert.assertEquals(
+ 1,
+ Objects.requireNonNull(new
File(exportDir.getAbsolutePath()).listFiles()).length
+ );
+
+ File resultFile = new File(exportDir,
"query-test-query-worker0-partition0.csv");
+ List<String> results = readResultsFromFile(resultFile);
+ Assert.assertEquals(
+ expectedFoo2FileContents(true),
+ results
);
}
@@ -95,36 +129,48 @@ public class MSQExportTest extends MSQTestBase
.verifyResults();
Assert.assertEquals(
- expectedFooFileContents().size(),
+ expectedFooFileContents(false).size(),
Objects.requireNonNull(new
File(exportDir.getAbsolutePath()).listFiles()).length
);
}
- private List<Object[]> expectedFooFileContents()
+ private List<String> expectedFooFileContents(boolean withHeader)
+ {
+ ArrayList<String> expectedResults = new ArrayList<>();
+ if (withHeader) {
+ expectedResults.add("cnt,dim");
+ }
+ expectedResults.addAll(ImmutableList.of(
+ "1,",
+ "1,10.1",
+ "1,2",
+ "1,1",
+ "1,def",
+ "1,abc"
+ )
+ );
+ return expectedResults;
+ }
+
+ private List<String> expectedFoo2FileContents(boolean withHeader)
{
- return new ArrayList<>(ImmutableList.of(
- new Object[]{"1", null},
- new Object[]{"1", 10.1},
- new Object[]{"1", 2},
- new Object[]{"1", 1},
- new Object[]{"1", "def"},
- new Object[]{"1", "abc"}
- ));
+ ArrayList<String> expectedResults = new ArrayList<>();
+ if (withHeader) {
+ expectedResults.add("table_dim,table_count");
+ }
+ expectedResults.addAll(ImmutableList.of("abc,1"));
+ return expectedResults;
}
- private String convertResultsToString(List<Object[]> expectedRows) throws
IOException
+ private List<String> readResultsFromFile(File resultFile) throws IOException
{
- ByteArrayOutputStream expectedResult = new ByteArrayOutputStream();
- ResultFormat.Writer formatter =
ResultFormat.CSV.createFormatter(expectedResult, objectMapper);
- formatter.writeResponseStart();
- for (Object[] row : expectedRows) {
- formatter.writeRowStart();
- for (Object object : row) {
- formatter.writeRowField("", object);
+ List<String> results = new ArrayList<>();
+ try (BufferedReader br = new BufferedReader(new
InputStreamReader(Files.newInputStream(resultFile.toPath()),
StringUtils.UTF8_STRING))) {
+ String line;
+ while (!(line = br.readLine()).isEmpty()) {
+ results.add(line);
}
- formatter.writeRowEnd();
+ return results;
}
- formatter.writeResponseEnd();
- return new String(expectedResult.toByteArray(), Charset.defaultCharset());
}
}
diff --git
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
new file mode 100644
index 00000000000..90f19164770
--- /dev/null
+++
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.storage.StorageConfig;
+import org.apache.druid.storage.StorageConnectorModule;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ExportResultsFrameProcessorFactoryTest
+{
+ @Test
+ public void testSerde() throws IOException
+ {
+ String exportFactoryString =
"{\"type\":\"exportResults\",\"queryId\":\"query-9128ieio9wq\",\"exportStorageProvider\":{\"type\":\"local\",\"exportPath\":\"/path\"},\"exportFormat\":\"csv\",\"resultTypeReference\":{\"type\":\"java.lang.Object\"}}";
+
+ ObjectMapper objectMapper = new DefaultObjectMapper();
+ objectMapper.registerModules(new
StorageConnectorModule().getJacksonModules());
+ objectMapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(StorageConfig.class, new StorageConfig("/"))
+ );
+
+ ExportResultsFrameProcessorFactory exportResultsFrameProcessorFactory =
objectMapper.readValue(
+ exportFactoryString,
+ ExportResultsFrameProcessorFactory.class
+ );
+ Assert.assertNull(exportResultsFrameProcessorFactory.getColumnMappings());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]