This is an automated email from the ASF dual-hosted git repository.

adarshsanjeev pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new a151bcfd12e Fix incorrect header names for certain export queries 
(#16096)
a151bcfd12e is described below

commit a151bcfd12e86b3ace7a9776cb7bc1e87ea2a017
Author: Adarsh Sanjeev <[email protected]>
AuthorDate: Tue Mar 19 15:11:04 2024 +0530

    Fix incorrect header names for certain export queries (#16096)
    
    * Fix incorrect header names for certain queries
    
    * Fix incorrect header names for certain queries
    
    * Maintain upgrade compatibility
    
    * Fix tests
    
    * Change null handling
---
 .../org/apache/druid/msq/exec/ControllerImpl.java  |   3 +-
 .../results/ExportResultsFrameProcessor.java       |  52 +++++++---
 .../ExportResultsFrameProcessorFactory.java        |  18 +++-
 .../org/apache/druid/msq/exec/MSQExportTest.java   | 108 +++++++++++++++------
 .../ExportResultsFrameProcessorFactoryTest.java    |  52 ++++++++++
 5 files changed, 183 insertions(+), 50 deletions(-)

diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
index c71941f5c07..e9d71239940 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/exec/ControllerImpl.java
@@ -1905,7 +1905,8 @@ public class ControllerImpl implements Controller
                                  .processorFactory(new 
ExportResultsFrameProcessorFactory(
                                      queryId,
                                      exportStorageProvider,
-                                     resultFormat
+                                     resultFormat,
+                                     columnMappings
                                  ))
       );
       return builder.build();
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
index de65d3e9d7a..52697578b07 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessor.java
@@ -21,6 +21,8 @@ package org.apache.druid.msq.querykit.results;
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import it.unimi.dsi.fastutil.ints.IntSet;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import org.apache.druid.error.DruidException;
 import org.apache.druid.frame.Frame;
 import org.apache.druid.frame.channel.ReadableFrameChannel;
@@ -35,13 +37,14 @@ import org.apache.druid.java.util.common.Unit;
 import org.apache.druid.java.util.common.granularity.Granularities;
 import org.apache.druid.java.util.common.guava.Sequence;
 import org.apache.druid.msq.counters.ChannelCounters;
-import org.apache.druid.msq.querykit.QueryKitUtils;
 import org.apache.druid.msq.util.SequenceUtils;
 import org.apache.druid.segment.BaseObjectColumnValueSelector;
 import org.apache.druid.segment.ColumnSelectorFactory;
 import org.apache.druid.segment.Cursor;
 import org.apache.druid.segment.VirtualColumns;
 import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.sql.calcite.planner.ColumnMapping;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.storage.StorageConnector;
 
@@ -60,6 +63,8 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
   private final ObjectMapper jsonMapper;
   private final ChannelCounters channelCounter;
   final String exportFilePath;
+  private final Object2IntMap<String> outputColumnNameToFrameColumnNumberMap;
+  private final RowSignature exportRowSignature;
 
   public ExportResultsFrameProcessor(
       final ReadableFrameChannel inputChannel,
@@ -68,7 +73,8 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
       final StorageConnector storageConnector,
       final ObjectMapper jsonMapper,
       final ChannelCounters channelCounter,
-      final String exportFilePath
+      final String exportFilePath,
+      final ColumnMappings columnMappings
   )
   {
     this.inputChannel = inputChannel;
@@ -78,6 +84,30 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
     this.jsonMapper = jsonMapper;
     this.channelCounter = channelCounter;
     this.exportFilePath = exportFilePath;
+    this.outputColumnNameToFrameColumnNumberMap = new 
Object2IntOpenHashMap<>();
+    final RowSignature inputRowSignature = frameReader.signature();
+
+    if (columnMappings == null) {
+      // If the column mappings wasn't sent, fail the query to avoid 
inconsistency in file format.
+      throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+                          .ofCategory(DruidException.Category.RUNTIME_FAILURE)
+                          .build("Received null columnMappings from 
controller. This might be due to an upgrade.");
+    }
+    for (final ColumnMapping columnMapping : columnMappings.getMappings()) {
+      this.outputColumnNameToFrameColumnNumberMap.put(
+          columnMapping.getOutputColumn(),
+          frameReader.signature().indexOf(columnMapping.getQueryColumn())
+      );
+    }
+    final RowSignature.Builder exportRowSignatureBuilder = 
RowSignature.builder();
+
+    for (String outputColumn : columnMappings.getOutputColumnNames()) {
+      exportRowSignatureBuilder.add(
+          outputColumn,
+          
inputRowSignature.getColumnType(outputColumnNameToFrameColumnNumberMap.getInt(outputColumn)).orElse(null)
+      );
+    }
+    this.exportRowSignature = exportRowSignatureBuilder.build();
   }
 
   @Override
@@ -109,8 +139,6 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
 
   private void exportFrame(final Frame frame) throws IOException
   {
-    final RowSignature exportRowSignature = 
createRowSignatureForExport(frameReader.signature());
-
     final Sequence<Cursor> cursorSequence =
         new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY)
             .makeCursors(null, Intervals.ETERNITY, VirtualColumns.EMPTY, 
Granularities.ALL, false, null);
@@ -135,7 +163,7 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
               //noinspection rawtypes
               @SuppressWarnings("rawtypes")
               final List<BaseObjectColumnValueSelector> selectors =
-                  exportRowSignature
+                  frameReader.signature()
                              .getColumnNames()
                              .stream()
                              
.map(columnSelectorFactory::makeColumnValueSelector)
@@ -144,7 +172,9 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
               while (!cursor.isDone()) {
                 formatter.writeRowStart();
                 for (int j = 0; j < exportRowSignature.size(); j++) {
-                  formatter.writeRowField(exportRowSignature.getColumnName(j), 
selectors.get(j).getObject());
+                  String columnName = exportRowSignature.getColumnName(j);
+                  BaseObjectColumnValueSelector<?> selector = 
selectors.get(outputColumnNameToFrameColumnNumberMap.getInt(columnName));
+                  formatter.writeRowField(columnName, selector.getObject());
                 }
                 channelCounter.incrementRowCount();
                 formatter.writeRowEnd();
@@ -162,16 +192,6 @@ public class ExportResultsFrameProcessor implements 
FrameProcessor<Object>
     }
   }
 
-  private static RowSignature createRowSignatureForExport(RowSignature 
inputRowSignature)
-  {
-    RowSignature.Builder exportRowSignatureBuilder = RowSignature.builder();
-    inputRowSignature.getColumnNames()
-                     .stream()
-                     .filter(name -> 
!QueryKitUtils.PARTITION_BOOST_COLUMN.equals(name))
-                     .forEach(name -> exportRowSignatureBuilder.add(name, 
inputRowSignature.getColumnType(name).orElse(null)));
-    return exportRowSignatureBuilder.build();
-  }
-
   @Override
   public void cleanup() throws IOException
   {
diff --git 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
index c9f9b6a40a8..5fe9b52191c 100644
--- 
a/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
+++ 
b/extensions-core/multi-stage-query/src/main/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactory.java
@@ -20,6 +20,7 @@
 package org.apache.druid.msq.querykit.results;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.fasterxml.jackson.annotation.JsonTypeName;
 import org.apache.druid.error.DruidException;
@@ -41,6 +42,7 @@ import org.apache.druid.msq.kernel.FrameContext;
 import org.apache.druid.msq.kernel.ProcessorsAndChannels;
 import org.apache.druid.msq.kernel.StageDefinition;
 import org.apache.druid.msq.querykit.BaseFrameProcessorFactory;
+import org.apache.druid.sql.calcite.planner.ColumnMappings;
 import org.apache.druid.sql.http.ResultFormat;
 import org.apache.druid.storage.ExportStorageProvider;
 import org.apache.druid.utils.CollectionUtils;
@@ -55,17 +57,20 @@ public class ExportResultsFrameProcessorFactory extends 
BaseFrameProcessorFactor
   private final String queryId;
   private final ExportStorageProvider exportStorageProvider;
   private final ResultFormat exportFormat;
+  private final ColumnMappings columnMappings;
 
   @JsonCreator
   public ExportResultsFrameProcessorFactory(
       @JsonProperty("queryId") String queryId,
       @JsonProperty("exportStorageProvider") ExportStorageProvider 
exportStorageProvider,
-      @JsonProperty("exportFormat") ResultFormat exportFormat
+      @JsonProperty("exportFormat") ResultFormat exportFormat,
+      @JsonProperty("columnMappings") @Nullable ColumnMappings columnMappings
   )
   {
     this.queryId = queryId;
     this.exportStorageProvider = exportStorageProvider;
     this.exportFormat = exportFormat;
+    this.columnMappings = columnMappings;
   }
 
   @JsonProperty("queryId")
@@ -87,6 +92,14 @@ public class ExportResultsFrameProcessorFactory extends 
BaseFrameProcessorFactor
     return exportStorageProvider;
   }
 
+  @JsonProperty("columnMappings")
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  @Nullable
+  public ColumnMappings getColumnMappings()
+  {
+    return columnMappings;
+  }
+
   @Override
   public ProcessorsAndChannels<Object, Long> makeProcessors(
       StageDefinition stageDefinition,
@@ -122,7 +135,8 @@ public class ExportResultsFrameProcessorFactory extends 
BaseFrameProcessorFactor
             exportStorageProvider.get(),
             frameContext.jsonMapper(),
             channelCounter,
-            getExportFilePath(queryId, workerNumber, 
readableInput.getStagePartition().getPartitionNumber(), exportFormat)
+            getExportFilePath(queryId, workerNumber, 
readableInput.getStagePartition().getPartitionNumber(), exportFormat),
+            columnMappings
         )
     );
 
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
index e6c3b5e2931..4619335bb87 100644
--- 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/exec/MSQExportTest.java
@@ -26,15 +26,14 @@ import org.apache.druid.msq.test.MSQTestBase;
 import org.apache.druid.msq.util.MultiStageQueryContext;
 import org.apache.druid.segment.column.ColumnType;
 import org.apache.druid.segment.column.RowSignature;
-import org.apache.druid.sql.calcite.export.TestExportStorageConnector;
-import org.apache.druid.sql.http.ResultFormat;
 import org.junit.Assert;
 import org.junit.Test;
 
-import java.io.ByteArrayOutputStream;
+import java.io.BufferedReader;
 import java.io.File;
 import java.io.IOException;
-import java.nio.charset.Charset;
+import java.io.InputStreamReader;
+import java.nio.file.Files;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -46,14 +45,13 @@ public class MSQExportTest extends MSQTestBase
   @Test
   public void testExport() throws IOException
   {
-    TestExportStorageConnector storageConnector = (TestExportStorageConnector) 
exportStorageConnectorProvider.get();
-
     RowSignature rowSignature = RowSignature.builder()
                                             .add("__time", ColumnType.LONG)
                                             .add("dim1", ColumnType.STRING)
                                             .add("cnt", 
ColumnType.LONG).build();
 
-    final String sql = StringUtils.format("insert into extern(%s()) as csv 
select cnt, dim1 from foo", TestExportStorageConnector.TYPE_NAME);
+    File exportDir = temporaryFolder.newFolder("export/");
+    final String sql = StringUtils.format("insert into 
extern(local(exportPath=>'%s')) as csv select cnt, dim1 as dim from foo", 
exportDir.getAbsolutePath());
 
     testIngestQuery().setSql(sql)
                      .setExpectedDataSource("foo1")
@@ -63,11 +61,47 @@ public class MSQExportTest extends MSQTestBase
                      .setExpectedResultRows(ImmutableList.of())
                      .verifyResults();
 
-    List<Object[]> objects = expectedFooFileContents();
+    Assert.assertEquals(
+        1,
+        Objects.requireNonNull(new 
File(exportDir.getAbsolutePath()).listFiles()).length
+    );
 
+    File resultFile = new File(exportDir, 
"query-test-query-worker0-partition0.csv");
+    List<String> results = readResultsFromFile(resultFile);
     Assert.assertEquals(
-        convertResultsToString(objects),
-        new String(storageConnector.getByteArrayOutputStream().toByteArray(), 
Charset.defaultCharset())
+        expectedFooFileContents(true),
+        results
+    );
+  }
+
+  @Test
+  public void testExport2() throws IOException
+  {
+    RowSignature rowSignature = RowSignature.builder()
+                                            .add("dim1", ColumnType.STRING)
+                                            .add("cnt", 
ColumnType.LONG).build();
+
+    File exportDir = temporaryFolder.newFolder("export/");
+    final String sql = StringUtils.format("insert into 
extern(local(exportPath=>'%s')) as csv select dim1 as table_dim, count(*) as 
table_count from foo where dim1 = 'abc' group by 1", 
exportDir.getAbsolutePath());
+
+    testIngestQuery().setSql(sql)
+                     .setExpectedDataSource("foo1")
+                     .setQueryContext(DEFAULT_MSQ_CONTEXT)
+                     .setExpectedRowSignature(rowSignature)
+                     .setExpectedSegment(ImmutableSet.of())
+                     .setExpectedResultRows(ImmutableList.of())
+                     .verifyResults();
+
+    Assert.assertEquals(
+        1,
+        Objects.requireNonNull(new 
File(exportDir.getAbsolutePath()).listFiles()).length
+    );
+
+    File resultFile = new File(exportDir, 
"query-test-query-worker0-partition0.csv");
+    List<String> results = readResultsFromFile(resultFile);
+    Assert.assertEquals(
+        expectedFoo2FileContents(true),
+        results
     );
   }
 
@@ -95,36 +129,48 @@ public class MSQExportTest extends MSQTestBase
                      .verifyResults();
 
     Assert.assertEquals(
-        expectedFooFileContents().size(),
+        expectedFooFileContents(false).size(),
         Objects.requireNonNull(new 
File(exportDir.getAbsolutePath()).listFiles()).length
     );
   }
 
-  private List<Object[]> expectedFooFileContents()
+  private List<String> expectedFooFileContents(boolean withHeader)
+  {
+    ArrayList<String> expectedResults = new ArrayList<>();
+    if (withHeader) {
+      expectedResults.add("cnt,dim");
+    }
+    expectedResults.addAll(ImmutableList.of(
+                               "1,",
+                               "1,10.1",
+                               "1,2",
+                               "1,1",
+                               "1,def",
+                               "1,abc"
+                           )
+    );
+    return expectedResults;
+  }
+
+  private List<String> expectedFoo2FileContents(boolean withHeader)
   {
-    return new ArrayList<>(ImmutableList.of(
-        new Object[]{"1", null},
-        new Object[]{"1", 10.1},
-        new Object[]{"1", 2},
-        new Object[]{"1", 1},
-        new Object[]{"1", "def"},
-        new Object[]{"1", "abc"}
-    ));
+    ArrayList<String> expectedResults = new ArrayList<>();
+    if (withHeader) {
+      expectedResults.add("table_dim,table_count");
+    }
+    expectedResults.addAll(ImmutableList.of("abc,1"));
+    return expectedResults;
   }
 
-  private String convertResultsToString(List<Object[]> expectedRows) throws 
IOException
+  private List<String> readResultsFromFile(File resultFile) throws IOException
   {
-    ByteArrayOutputStream expectedResult = new ByteArrayOutputStream();
-    ResultFormat.Writer formatter = 
ResultFormat.CSV.createFormatter(expectedResult, objectMapper);
-    formatter.writeResponseStart();
-    for (Object[] row : expectedRows) {
-      formatter.writeRowStart();
-      for (Object object : row) {
-        formatter.writeRowField("", object);
+    List<String> results = new ArrayList<>();
+    try (BufferedReader br = new BufferedReader(new 
InputStreamReader(Files.newInputStream(resultFile.toPath()), 
StringUtils.UTF8_STRING))) {
+      String line;
+      while (!(line = br.readLine()).isEmpty()) {
+        results.add(line);
       }
-      formatter.writeRowEnd();
+      return results;
     }
-    formatter.writeResponseEnd();
-    return new String(expectedResult.toByteArray(), Charset.defaultCharset());
   }
 }
diff --git 
a/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
new file mode 100644
index 00000000000..90f19164770
--- /dev/null
+++ 
b/extensions-core/multi-stage-query/src/test/java/org/apache/druid/msq/querykit/results/ExportResultsFrameProcessorFactoryTest.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.msq.querykit.results;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.storage.StorageConfig;
+import org.apache.druid.storage.StorageConnectorModule;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+
+public class ExportResultsFrameProcessorFactoryTest
+{
+  @Test
+  public void testSerde() throws IOException
+  {
+    String exportFactoryString = 
"{\"type\":\"exportResults\",\"queryId\":\"query-9128ieio9wq\",\"exportStorageProvider\":{\"type\":\"local\",\"exportPath\":\"/path\"},\"exportFormat\":\"csv\",\"resultTypeReference\":{\"type\":\"java.lang.Object\"}}";
+
+    ObjectMapper objectMapper = new DefaultObjectMapper();
+    objectMapper.registerModules(new 
StorageConnectorModule().getJacksonModules());
+    objectMapper.setInjectableValues(
+        new InjectableValues.Std()
+            .addValue(StorageConfig.class, new StorageConfig("/"))
+    );
+
+    ExportResultsFrameProcessorFactory exportResultsFrameProcessorFactory = 
objectMapper.readValue(
+        exportFactoryString,
+        ExportResultsFrameProcessorFactory.class
+    );
+    Assert.assertNull(exportResultsFrameProcessorFactory.getColumnMappings());
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to