This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7062133323 Don't drop original field during flatten (#13490)
7062133323 is described below

commit 7062133323f573a3452d3cb4baaee40bdb408e47
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jun 28 17:32:09 2024 -0700

    Don't drop original field during flatten (#13490)
---
 .../queries/JsonIngestionFromAvroQueriesTest.java  |  84 ++++-
 .../JsonUnnestIngestionFromAvroQueriesTest.java    | 394 +++++++++++++++++++++
 .../recordtransformer/ComplexTypeTransformer.java  |  24 +-
 .../recordtransformer/ExpressionTransformer.java   |  35 +-
 .../ComplexTypeTransformerTest.java                |  14 +-
 .../apache/pinot/spi/data/readers/GenericRow.java  |  14 +
 6 files changed, 537 insertions(+), 28 deletions(-)

diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
index b53f3a35d4..a6f376c6cd 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonIngestionFromAvroQueriesTest.java
@@ -37,6 +37,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.file.DataFileWriter;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecordBuilder;
 import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pinot.common.function.scalar.StringFunctions;
@@ -80,6 +81,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
   private static final String JSON_COLUMN_2 = "jsonColumn2"; // for testing 
ENUM
   private static final String JSON_COLUMN_3 = "jsonColumn3"; // for testing 
FIXED
   private static final String JSON_COLUMN_4 = "jsonColumn4"; // for testing 
BYTES
+  private static final String JSON_COLUMN_5 = "jsonColumn5"; // for testing 
ARRAY of MAPS
   private static final String STRING_COLUMN = "stringColumn";
   private static final org.apache.pinot.spi.data.Schema SCHEMA =
       new 
org.apache.pinot.spi.data.Schema.SchemaBuilder().addSingleValueDimension(INT_COLUMN,
 FieldSpec.DataType.INT)
@@ -87,6 +89,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
           .addSingleValueDimension(JSON_COLUMN_2, FieldSpec.DataType.JSON)
           .addSingleValueDimension(JSON_COLUMN_3, FieldSpec.DataType.JSON)
           .addSingleValueDimension(JSON_COLUMN_4, FieldSpec.DataType.JSON)
+          .addSingleValueDimension(JSON_COLUMN_5, FieldSpec.DataType.JSON)
           .addSingleValueDimension(STRING_COLUMN, 
FieldSpec.DataType.STRING).build();
   private static final TableConfig TABLE_CONFIG =
       new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).build();
@@ -111,7 +114,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
 
   /** @return {@link GenericRow} representing a row in Pinot table. */
   private static GenericRow createTableRecord(int intValue, String 
stringValue, Object jsonValue,
-      GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue, byte[] 
bytesValue) {
+      GenericData.EnumSymbol enumValue, GenericData.Fixed fixedValue, byte[] 
bytesValue, List<Object> arrayValue) {
     GenericRow record = new GenericRow();
     record.putValue(INT_COLUMN, intValue);
     record.putValue(STRING_COLUMN, stringValue);
@@ -119,6 +122,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     record.putValue(JSON_COLUMN_2, enumValue);
     record.putValue(JSON_COLUMN_3, fixedValue);
     record.putValue(JSON_COLUMN_4, ByteBuffer.wrap(bytesValue));
+    record.putValue(JSON_COLUMN_5, arrayValue);
     return record;
   }
 
@@ -137,6 +141,13 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     return createRecord("record", "doc", 
JsonIngestionFromAvroQueriesTest.class.getCanonicalName(), false, fields);
   }
 
+  private static Schema createJson5RecordSchema() {
+    List<Field> fields = new ArrayList<>();
+    fields.add(new Field("timestamp", create(Type.LONG)));
+    fields.add(new Field("data", createMap(create(Type.STRING))));
+    return createRecord("record", "doc", 
"JsonIngestionFromAvroQueriesTest$Json5", false, fields);
+  }
+
   private static GenericData.Record createRecordField(String k1, int v1, 
String k2, String v2) {
     GenericData.Record record = new GenericData.Record(createRecordSchema());
     record.put(k1, v1);
@@ -163,41 +174,65 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
         new Field(INT_COLUMN, createUnion(Lists.newArrayList(create(Type.INT), 
create(Type.NULL))), null, null),
         new Field(STRING_COLUMN, 
createUnion(Lists.newArrayList(create(Type.STRING), create(Type.NULL))), null, 
null),
         new Field(JSON_COLUMN_1,
-            createUnion(createArray(create(Type.STRING)), 
createMap(create(Type.STRING)), createRecordSchema(),
-                create(Type.STRING), create(Type.NULL))), new 
Field(JSON_COLUMN_2, enumSchema),
+            createUnion(
+                createArray(create(Type.STRING)),
+                createMap(create(Type.STRING)),
+                createRecordSchema(),
+                create(Type.STRING),
+                create(Type.NULL))),
+        new Field(JSON_COLUMN_2, enumSchema),
         new Field(JSON_COLUMN_3, fixedSchema),
-        new Field(JSON_COLUMN_4, create(Type.BYTES)));
+        new Field(JSON_COLUMN_4, create(Type.BYTES)),
+        new Field(JSON_COLUMN_5, createArray(createJson5RecordSchema()))
+    );
     avroSchema.setFields(fields);
     List<GenericRow> inputRecords = new ArrayList<>();
     // Insert ARRAY
     inputRecords.add(
         createTableRecord(1, "daffy duck", Arrays.asList("this", "is", "a", 
"test"), createEnumField(enumSchema, "UP"),
-            createFixedField(fixedSchema, 1), new byte[] {0, 0, 0, 1}));
+            createFixedField(fixedSchema, 1), new byte[] {0, 0, 0, 1}, 
Arrays.asList(
+                new GenericRecordBuilder(createJson5RecordSchema())
+                    .set("timestamp", 1719390721)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "1"), 
Pair.of("b", "2")})).build())));
 
     // Insert MAP
     inputRecords.add(
         createTableRecord(2, "mickey mouse", createMapField(new 
Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 
2), new byte[] {0, 0, 0, 2}));
+            createEnumField(enumSchema, "DOWN"), createFixedField(fixedSchema, 
2), new byte[] {0, 0, 0, 2},
+            Arrays.asList(new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390722)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "2"), 
Pair.of("b", "4")})).build())));
+
     inputRecords.add(
         createTableRecord(3, "donald duck", createMapField(new 
Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 
3), new byte[] {0, 0, 0, 3}));
+            createEnumField(enumSchema, "UP"), createFixedField(fixedSchema, 
3), new byte[] {0, 0, 0, 3}, Arrays.asList(
+                new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390723)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "3"), 
Pair.of("b", "6")})).build())));
+
     inputRecords.add(
         createTableRecord(4, "scrooge mcduck", createMapField(new 
Pair[]{Pair.of("a", "1"), Pair.of("b", "2")}),
-            createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 
4), new byte[] {0, 0, 0, 4}));
+            createEnumField(enumSchema, "LEFT"), createFixedField(fixedSchema, 
4), new byte[] {0, 0, 0, 4},
+            Arrays.asList(new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390724)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "4"), 
Pair.of("b", "8")})).build())));
 
     // insert RECORD
     inputRecords.add(createTableRecord(5, "minney mouse", 
createRecordField("id", 1, "name", "minney"),
-        createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 
5), new byte[] {0, 0, 0, 5}));
+        createEnumField(enumSchema, "RIGHT"), createFixedField(fixedSchema, 
5), new byte[] {0, 0, 0, 5}, Arrays.asList(
+            new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390725)
+                .set("data", createMapField(new Pair[]{Pair.of("a", "5"), 
Pair.of("b", "10")})).build())));
 
     // Insert simple Java String (gets converted into JSON value)
     inputRecords.add(
         createTableRecord(6, "pluto", "test", createEnumField(enumSchema, 
"DOWN"), createFixedField(fixedSchema, 6),
-            new byte[] {0, 0, 0, 6}));
+            new byte[] {0, 0, 0, 6}, Arrays.asList(
+                new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390726)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "6"), 
Pair.of("b", "12")})).build())));
 
     // Insert JSON string (gets converted into JSON document)
     inputRecords.add(
         createTableRecord(7, "scooby doo", "{\"name\":\"scooby\",\"id\":7}", 
createEnumField(enumSchema, "UP"),
-            createFixedField(fixedSchema, 7), new byte[] {0, 0, 0, 7}));
+            createFixedField(fixedSchema, 7), new byte[] {0, 0, 0, 7}, 
Arrays.asList(
+                new 
GenericRecordBuilder(createJson5RecordSchema()).set("timestamp", 1719390727)
+                    .set("data", createMapField(new Pair[]{Pair.of("a", "7"), 
Pair.of("b", "14")})).build())));
 
     try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
       fileWriter.create(avroSchema, AVRO_DATA_FILE);
@@ -209,6 +244,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
         record.put(JSON_COLUMN_2, inputRecord.getValue(JSON_COLUMN_2));
         record.put(JSON_COLUMN_3, inputRecord.getValue(JSON_COLUMN_3));
         record.put(JSON_COLUMN_4, inputRecord.getValue(JSON_COLUMN_4));
+        record.put(JSON_COLUMN_5, inputRecord.getValue(JSON_COLUMN_5));
         fileWriter.append(record);
       }
     }
@@ -223,6 +259,7 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     set.add(JSON_COLUMN_2);
     set.add(JSON_COLUMN_3);
     set.add(JSON_COLUMN_4);
+    set.add(JSON_COLUMN_5);
     AvroRecordReader avroRecordReader = new AvroRecordReader();
     avroRecordReader.init(AVRO_DATA_FILE, set, null);
     return avroRecordReader;
@@ -336,6 +373,31 @@ public class JsonIngestionFromAvroQueriesTest extends 
BaseQueriesTest {
     testByteArray("select jsonColumn4 FROM testTable");
   }
 
+  @Test
+  public void testComplexSelectOnJsonColumn() {
+    Operator<SelectionResultsBlock> operator = getOperator(
+        "select jsonColumn5 FROM testTable");
+    SelectionResultsBlock block = operator.nextBlock();
+    Collection<Object[]> rows = block.getRows();
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(0), 
DataSchema.ColumnDataType.JSON);
+
+    List<String> expecteds = Arrays.asList(
+        "[[{\"data\":{\"a\":\"1\",\"b\":\"2\"},\"timestamp\":1719390721}]]",
+        "[[{\"data\":{\"a\":\"2\",\"b\":\"4\"},\"timestamp\":1719390722}]]",
+        "[[{\"data\":{\"a\":\"3\",\"b\":\"6\"},\"timestamp\":1719390723}]]",
+        "[[{\"data\":{\"a\":\"4\",\"b\":\"8\"},\"timestamp\":1719390724}]]",
+        "[[{\"data\":{\"a\":\"5\",\"b\":\"10\"},\"timestamp\":1719390725}]]",
+        "[[{\"data\":{\"a\":\"6\",\"b\":\"12\"},\"timestamp\":1719390726}]]",
+        "[[{\"data\":{\"a\":\"7\",\"b\":\"14\"},\"timestamp\":1719390727}]]");
+
+    int index = 0;
+    Iterator<Object[]> iterator = rows.iterator();
+    while (iterator.hasNext()) {
+      Object[] row = iterator.next();
+      Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
+    }
+  }
+
   private void testByteArray(String query) {
     Operator<SelectionResultsBlock> operator = getOperator(query);
     SelectionResultsBlock block = operator.nextBlock();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
new file mode 100644
index 0000000000..bdbad4b803
--- /dev/null
+++ 
b/pinot-core/src/test/java/org/apache/pinot/queries/JsonUnnestIngestionFromAvroQueriesTest.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.queries;
+
+import com.google.common.collect.Lists;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.commons.io.FileUtils;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.Operator;
+import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock;
+import org.apache.pinot.plugin.inputformat.avro.AvroRecordReader;
+import 
org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentLoader;
+import 
org.apache.pinot.segment.local.segment.creator.impl.SegmentIndexCreationDriverImpl;
+import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
+import org.apache.pinot.segment.spi.ImmutableSegment;
+import org.apache.pinot.segment.spi.IndexSegment;
+import org.apache.pinot.segment.spi.creator.SegmentGeneratorConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.ComplexTypeConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.GenericRow;
+import org.apache.pinot.spi.data.readers.RecordReader;
+import org.apache.pinot.spi.utils.ReadMode;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import static org.apache.avro.Schema.*;
+
+
+/**
+ * Test if ComplexType (RECORD, ARRAY, MAP, UNION, ENUM, and FIXED) field from 
an AVRO file can be ingested into a JSON
+ * column in a Pinot segment.
+ */
+public class JsonUnnestIngestionFromAvroQueriesTest extends BaseQueriesTest {
+  private static final File INDEX_DIR = new File(FileUtils.getTempDirectory(), 
"JsonIngestionFromAvroTest");
+  private static final File AVRO_DATA_FILE = new File(INDEX_DIR, 
"JsonIngestionFromAvroTest.avro");
+  private static final String RAW_TABLE_NAME = "testTable";
+  private static final String SEGMENT_NAME = "testSegment";
+
+  private static final String INT_COLUMN = "intColumn";
+  private static final String JSON_COLUMN = "jsonColumn"; // for testing ARRAY 
of MAPS
+  private static final String STRING_COLUMN = "stringColumn";
+  private static final String EVENTTIME_JSON_COLUMN = "eventTimeColumn";
+  private static final org.apache.pinot.spi.data.Schema SCHEMA =
+      new org.apache.pinot.spi.data.Schema.SchemaBuilder()
+          .addSingleValueDimension(INT_COLUMN, FieldSpec.DataType.INT)
+          .addSingleValueDimension(STRING_COLUMN, FieldSpec.DataType.STRING)
+          .addSingleValueDimension(JSON_COLUMN, FieldSpec.DataType.JSON)
+          .addSingleValueDimension("jsonColumn.timestamp", 
FieldSpec.DataType.TIMESTAMP)
+          .addSingleValueDimension("jsonColumn.data", FieldSpec.DataType.JSON)
+          .addSingleValueDimension("jsonColumn.data.a", 
FieldSpec.DataType.STRING)
+          .addSingleValueDimension("jsonColumn.data.b", 
FieldSpec.DataType.STRING)
+          .addSingleValueDimension(EVENTTIME_JSON_COLUMN, 
FieldSpec.DataType.TIMESTAMP)
+          .addSingleValueDimension("eventTimeColumn_10m", 
FieldSpec.DataType.TIMESTAMP)
+          .build();
+  private static final TableConfig TABLE_CONFIG =
+      new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setIngestionConfig(
+          new IngestionConfig(null, null, null, null,
+              List.of(new TransformConfig("eventTimeColumn", 
"eventTimeColumn.seconds * 1000"),
+                  new TransformConfig("eventTimeColumn_10m", 
"round(eventTimeColumn, 60000)")),
+              new ComplexTypeConfig(List.of(JSON_COLUMN), null, null, null), 
null, null, null)
+      ).build();
+
+  private IndexSegment _indexSegment;
+  private List<IndexSegment> _indexSegments;
+
+  @Override
+  protected String getFilter() {
+    return "";
+  }
+
+  @Override
+  protected IndexSegment getIndexSegment() {
+    return _indexSegment;
+  }
+
+  @Override
+  protected List<IndexSegment> getIndexSegments() {
+    return _indexSegments;
+  }
+
+  /** @return {@link GenericRow} representing a row in Pinot table. */
+  private static GenericRow createTableRecord(int intValue, String 
stringValue, List<Object> arrayValue,
+      Object eventTimeValue) {
+    GenericRow record = new GenericRow();
+    record.putValue(INT_COLUMN, intValue);
+    record.putValue(STRING_COLUMN, stringValue);
+    record.putValue(JSON_COLUMN, arrayValue);
+    record.putValue(EVENTTIME_JSON_COLUMN, eventTimeValue);
+    return record;
+  }
+
+  private static Schema createJsonRecordSchema() {
+    List<Field> fields = new ArrayList<>();
+    fields.add(new Field("timestamp", create(Type.LONG)));
+    fields.add(new Field("data", createMap(create(Type.STRING))));
+    return createRecord("record", "doc", 
JsonUnnestIngestionFromAvroQueriesTest.class.getCanonicalName() + "$Json",
+        false, fields);
+  }
+
+  private static Schema createEventTimeRecordSchema() {
+    List<Field> fields = new ArrayList<>();
+    fields.add(new Field("seconds", create(Type.LONG)));
+    return createRecord("record", "doc", 
JsonUnnestIngestionFromAvroQueriesTest.class.getCanonicalName() + "$EventTime",
+        false, fields);
+  }
+
+  private static void createInputFile()
+      throws IOException {
+    INDEX_DIR.mkdir();
+    Schema avroSchema = createRecord("eventsRecord", null, null, false);
+    List<Field> fields = Arrays.asList(
+        new Field(INT_COLUMN, createUnion(Lists.newArrayList(create(Type.INT), 
create(Type.NULL))), null, null),
+        new Field(STRING_COLUMN, 
createUnion(Lists.newArrayList(create(Type.STRING), create(Type.NULL))), null, 
null),
+        new Field(JSON_COLUMN, createArray(createJsonRecordSchema())),
+        new Field(EVENTTIME_JSON_COLUMN, createEventTimeRecordSchema())
+
+    );
+    avroSchema.setFields(fields);
+    List<GenericRow> inputRecords = new ArrayList<>();
+    // Insert ARRAY
+    inputRecords.add(
+        createTableRecord(1, "daffy duck", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390721)
+                    .set("data", Map.of("a", "1", "b", "2"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390722)
+                    .set("data", Map.of("a", "2", "b", "4"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390721)
+                .build()));
+
+    // Insert MAP
+    inputRecords.add(
+        createTableRecord(2, "mickey mouse", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390722)
+                    .set("data", Map.of("a", "2", "b", "4"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390723)
+                    .set("data", Map.of("a", "3", "b", "6"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390722)
+                .build()));
+
+    inputRecords.add(
+        createTableRecord(3, "donald duck", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390723)
+                    .set("data", Map.of("a", "3", "b", "6"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390724)
+                    .set("data", Map.of("a", "4", "b", "8"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390723)
+                .build()));
+
+    inputRecords.add(
+        createTableRecord(4, "scrooge mcduck", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390724)
+                    .set("data", Map.of("a", "4", "b", "8"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390725)
+                    .set("data", Map.of("a", "5", "b", "10"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390724)
+                .build()));
+
+    // insert RECORD
+    inputRecords.add(createTableRecord(5, "minney mouse", Arrays.asList(
+            new GenericRecordBuilder(createJsonRecordSchema())
+                .set("timestamp", 1719390725)
+                .set("data", Map.of("a", "5", "b", "10"))
+                .build(),
+            new GenericRecordBuilder(createJsonRecordSchema())
+                .set("timestamp", 1719390726)
+                .set("data", Map.of("a", "6", "b", "12"))
+                .build()
+        ),
+        new GenericRecordBuilder(createEventTimeRecordSchema())
+            .set("seconds", 1719390725)
+            .build()));
+
+    // Insert simple Java String (gets converted into JSON value)
+    inputRecords.add(
+        createTableRecord(6, "pluto", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390726)
+                    .set("data", Map.of("a", "6", "b", "12"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390727)
+                    .set("data", Map.of("a", "7", "b", "14"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390726)
+                .build()));
+
+    // Insert JSON string (gets converted into JSON document)
+    inputRecords.add(
+        createTableRecord(7, "scooby doo", Arrays.asList(
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390727)
+                    .set("data", Map.of("a", "7", "b", "14"))
+                    .build(),
+                new GenericRecordBuilder(createJsonRecordSchema())
+                    .set("timestamp", 1719390728)
+                    .set("data", Map.of("a", "8", "b", "16"))
+                    .build()
+            ),
+            new GenericRecordBuilder(createEventTimeRecordSchema())
+                .set("seconds", 1719390727)
+                .build()));
+
+    try (DataFileWriter<GenericData.Record> fileWriter = new 
DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) {
+      fileWriter.create(avroSchema, AVRO_DATA_FILE);
+      for (GenericRow inputRecord : inputRecords) {
+        GenericData.Record record = new GenericData.Record(avroSchema);
+        record.put(INT_COLUMN, inputRecord.getValue(INT_COLUMN));
+        record.put(STRING_COLUMN, inputRecord.getValue(STRING_COLUMN));
+        record.put(JSON_COLUMN, inputRecord.getValue(JSON_COLUMN));
+        record.put(EVENTTIME_JSON_COLUMN, 
inputRecord.getValue(EVENTTIME_JSON_COLUMN));
+        fileWriter.append(record);
+      }
+    }
+  }
+
+  private static RecordReader createRecordReader()
+      throws IOException {
+    Set<String> set = new HashSet<>();
+    set.add(INT_COLUMN);
+    set.add(STRING_COLUMN);
+    set.add(JSON_COLUMN);
+    set.add(EVENTTIME_JSON_COLUMN);
+    AvroRecordReader avroRecordReader = new AvroRecordReader();
+    avroRecordReader.init(AVRO_DATA_FILE, set, null);
+    return avroRecordReader;
+  }
+
+  /** Create an AVRO file and then ingest it into Pinot while creating a 
JsonIndex. */
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    FileUtils.deleteDirectory(INDEX_DIR);
+    createInputFile();
+
+    List<String> jsonIndexColumns = Arrays.asList(JSON_COLUMN);
+    TABLE_CONFIG.getIndexingConfig().setJsonIndexColumns(jsonIndexColumns);
+    SegmentGeneratorConfig segmentGeneratorConfig = new 
SegmentGeneratorConfig(TABLE_CONFIG, SCHEMA);
+    segmentGeneratorConfig.setTableName(RAW_TABLE_NAME);
+    segmentGeneratorConfig.setSegmentName(SEGMENT_NAME);
+    segmentGeneratorConfig.setOutDir(INDEX_DIR.getPath());
+    segmentGeneratorConfig.setInputFilePath(AVRO_DATA_FILE.getPath());
+
+    SegmentIndexCreationDriverImpl driver = new 
SegmentIndexCreationDriverImpl();
+    driver.init(segmentGeneratorConfig, createRecordReader());
+    driver.build();
+
+    IndexLoadingConfig indexLoadingConfig = new IndexLoadingConfig();
+    indexLoadingConfig.setTableConfig(TABLE_CONFIG);
+    indexLoadingConfig.setJsonIndexColumns(new HashSet<>(jsonIndexColumns));
+    indexLoadingConfig.setReadMode(ReadMode.mmap);
+
+    ImmutableSegment immutableSegment =
+        ImmutableSegmentLoader.load(new File(INDEX_DIR, SEGMENT_NAME), 
indexLoadingConfig);
+    _indexSegment = immutableSegment;
+    _indexSegments = Arrays.asList(immutableSegment, immutableSegment);
+  }
+
+  @Test
+  public void testComplexSelectOnJsonColumn() {
+    Operator<SelectionResultsBlock> operator = getOperator(
+        "select intColumn, stringColumn, jsonColumn, \"jsonColumn.timestamp\", 
jsonColumn.data, jsonColumn.data.a, "
+            + "jsonColumn.data.b, eventTimeColumn, eventTimeColumn_10m FROM 
testTable LIMIT 1000");
+    SelectionResultsBlock block = operator.nextBlock();
+    Collection<Object[]> rows = block.getRows();
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(0), 
DataSchema.ColumnDataType.INT);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(1), 
DataSchema.ColumnDataType.STRING);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(2), 
DataSchema.ColumnDataType.JSON);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(3), 
DataSchema.ColumnDataType.TIMESTAMP);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(4), 
DataSchema.ColumnDataType.JSON);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(5), 
DataSchema.ColumnDataType.STRING);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(6), 
DataSchema.ColumnDataType.STRING);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(7), 
DataSchema.ColumnDataType.TIMESTAMP);
+    Assert.assertEquals(block.getDataSchema().getColumnDataType(8), 
DataSchema.ColumnDataType.TIMESTAMP);
+
+    List<String> expecteds = Arrays.asList(
+        "[1, daffy duck, 
[{\"data\":{\"a\":\"1\",\"b\":\"2\"},\"timestamp\":1719390721},{\"data\":{\"a\":\"2\","
+            + "\"b\":\"4\"},\"timestamp\":1719390722}], 1719390721, 
{\"a\":\"1\",\"b\":\"2\"}, 1, 2, 1719390721000, "
+            + "1719390720000]",
+        "[1, daffy duck, 
[{\"data\":{\"a\":\"1\",\"b\":\"2\"},\"timestamp\":1719390721},{\"data\":{\"a\":\"2\","
+            + "\"b\":\"4\"},\"timestamp\":1719390722}], 1719390722, 
{\"a\":\"2\",\"b\":\"4\"}, 2, 4, 1719390721000, "
+            + "1719390720000]",
+        "[2, mickey mouse, 
[{\"data\":{\"a\":\"2\",\"b\":\"4\"},\"timestamp\":1719390722},{\"data\":{\"a\":\"3\","
+            + "\"b\":\"6\"},\"timestamp\":1719390723}], 1719390722, 
{\"a\":\"2\",\"b\":\"4\"}, 2, 4, 1719390722000, "
+            + "1719390720000]",
+        "[2, mickey mouse, 
[{\"data\":{\"a\":\"2\",\"b\":\"4\"},\"timestamp\":1719390722},{\"data\":{\"a\":\"3\","
+            + "\"b\":\"6\"},\"timestamp\":1719390723}], 1719390723, 
{\"a\":\"3\",\"b\":\"6\"}, 3, 6, 1719390722000, "
+            + "1719390720000]",
+        "[3, donald duck, 
[{\"data\":{\"a\":\"3\",\"b\":\"6\"},\"timestamp\":1719390723},{\"data\":{\"a\":\"4\","
+            + "\"b\":\"8\"},\"timestamp\":1719390724}], 1719390723, 
{\"a\":\"3\",\"b\":\"6\"}, 3, 6, 1719390723000, "
+            + "1719390720000]",
+        "[3, donald duck, 
[{\"data\":{\"a\":\"3\",\"b\":\"6\"},\"timestamp\":1719390723},{\"data\":{\"a\":\"4\","
+            + "\"b\":\"8\"},\"timestamp\":1719390724}], 1719390724, 
{\"a\":\"4\",\"b\":\"8\"}, 4, 8, 1719390723000, "
+            + "1719390720000]",
+        "[4, scrooge mcduck, 
[{\"data\":{\"a\":\"4\",\"b\":\"8\"},\"timestamp\":1719390724},{\"data\":{\"a\":\"5\","
+            + "\"b\":\"10\"},\"timestamp\":1719390725}], 1719390724, 
{\"a\":\"4\",\"b\":\"8\"}, 4, 8, 1719390724000, "
+            + "1719390720000]",
+        "[4, scrooge mcduck, 
[{\"data\":{\"a\":\"4\",\"b\":\"8\"},\"timestamp\":1719390724},{\"data\":{\"a\":\"5\","
+            + "\"b\":\"10\"},\"timestamp\":1719390725}], 1719390725, 
{\"a\":\"5\",\"b\":\"10\"}, 5, 10, "
+            + "1719390724000, 1719390720000]",
+        "[5, minney mouse, 
[{\"data\":{\"a\":\"5\",\"b\":\"10\"},\"timestamp\":1719390725},{\"data\":{\"a\":\"6\","
+            + "\"b\":\"12\"},\"timestamp\":1719390726}], 1719390725, 
{\"a\":\"5\",\"b\":\"10\"}, 5, 10, "
+            + "1719390725000, 1719390720000]",
+        "[5, minney mouse, 
[{\"data\":{\"a\":\"5\",\"b\":\"10\"},\"timestamp\":1719390725},{\"data\":{\"a\":\"6\","
+            + "\"b\":\"12\"},\"timestamp\":1719390726}], 1719390726, 
{\"a\":\"6\",\"b\":\"12\"}, 6, 12, "
+            + "1719390725000, 1719390720000]",
+        "[6, pluto, 
[{\"data\":{\"a\":\"6\",\"b\":\"12\"},\"timestamp\":1719390726},{\"data\":{\"a\":\"7\","
+            + "\"b\":\"14\"},\"timestamp\":1719390727}], 1719390726, 
{\"a\":\"6\",\"b\":\"12\"}, 6, 12, "
+            + "1719390726000, 1719390720000]",
+        "[6, pluto, 
[{\"data\":{\"a\":\"6\",\"b\":\"12\"},\"timestamp\":1719390726},{\"data\":{\"a\":\"7\","
+            + "\"b\":\"14\"},\"timestamp\":1719390727}], 1719390727, 
{\"a\":\"7\",\"b\":\"14\"}, 7, 14, "
+            + "1719390726000, 1719390720000]",
+        "[7, scooby doo, 
[{\"data\":{\"a\":\"7\",\"b\":\"14\"},\"timestamp\":1719390727},{\"data\":{\"a\":\"8\","
+            + "\"b\":\"16\"},\"timestamp\":1719390728}], 1719390727, 
{\"a\":\"7\",\"b\":\"14\"}, 7, 14, "
+            + "1719390727000, 1719390720000]",
+        "[7, scooby doo, 
[{\"data\":{\"a\":\"7\",\"b\":\"14\"},\"timestamp\":1719390727},{\"data\":{\"a\":\"8\","
+            + "\"b\":\"16\"},\"timestamp\":1719390728}], 1719390728, 
{\"a\":\"8\",\"b\":\"16\"}, 8, 16, "
+            + "1719390727000, 1719390720000]");
+    Assert.assertEquals(rows.size(), 14);
+    int index = 0;
+    for (Object[] row : rows) {
+      System.out.println(Arrays.toString(row));
+      Assert.assertEquals(Arrays.toString(row), expecteds.get(index++));
+    }
+  }
+
+  @AfterClass
+  public void tearDown()
+      throws IOException {
+    _indexSegment.destroy();
+    FileUtils.deleteDirectory(INDEX_DIR);
+  }
+}
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
index 48db4c6ad3..2bec0ced42 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformer.java
@@ -98,7 +98,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
 
   public ComplexTypeTransformer(TableConfig tableConfig) {
     this(parseFieldsToUnnest(tableConfig), parseDelimiter(tableConfig),
-            parseCollectionNotUnnestedToJson(tableConfig), 
parsePrefixesToRename(tableConfig), tableConfig);
+        parseCollectionNotUnnestedToJson(tableConfig), 
parsePrefixesToRename(tableConfig), tableConfig);
   }
 
   @VisibleForTesting
@@ -163,7 +163,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
 
   private static Map<String, String> parsePrefixesToRename(TableConfig 
tableConfig) {
     if (tableConfig.getIngestionConfig() != null && 
tableConfig.getIngestionConfig().getComplexTypeConfig() != null
-            && 
tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToRename() 
!= null) {
+        && 
tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToRename() 
!= null) {
       return 
tableConfig.getIngestionConfig().getComplexTypeConfig().getPrefixesToRename();
     } else {
       return Collections.emptyMap();
@@ -173,10 +173,20 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
   @Override
   public GenericRow transform(GenericRow record) {
     try {
+      GenericRow originalRow = _fieldsToUnnest.isEmpty() ? null : 
record.copy(_fieldsToUnnest);
       flattenMap(record, new 
ArrayList<>(record.getFieldToValueMap().keySet()));
-      for (String collection : _fieldsToUnnest) {
-        unnestCollection(record, collection);
+      for (String field : _fieldsToUnnest) {
+        unnestCollection(record, field);
       }
+      Object unnestedRows = record.getValue(GenericRow.MULTIPLE_RECORDS_KEY);
+      if (originalRow != null && unnestedRows instanceof Collection) {
+        for (GenericRow unnestedRow : (Collection<GenericRow>) unnestedRows) {
+          for (String field : _fieldsToUnnest) {
+            unnestedRow.putValue(field, originalRow.getValue(field));
+          }
+        }
+      }
+
       renamePrefixes(record);
     } catch (Exception e) {
       if (!_continueOnError) {
@@ -207,7 +217,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
   }
 
   private void unnestCollection(GenericRow record, String column, 
List<GenericRow> list) {
-    Object value = record.removeValue(column);
+    Object value = record.getValue(column);
     if (value == null) {
       // use the record itself
       list.add(record);
@@ -329,7 +339,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
           String newName = replacementPrefix + remainingColumnName;
           if (newName.isEmpty() || record.getValue(newName) != null) {
             throw new RuntimeException(
-                    String.format("Name conflict after attempting to rename 
field %s to %s", field, newName));
+                String.format("Name conflict after attempting to rename field 
%s to %s", field, newName));
           }
           record.putValue(newName, value);
         }
@@ -366,7 +376,7 @@ public class ComplexTypeTransformer implements 
RecordTransformer {
       Object value = map.get(field);
       String concatName = concat(path, field);
       if (value instanceof Map) {
-        Map<String, Object> innerMap = (Map<String, Object>) map.remove(field);
+        Map<String, Object> innerMap = (Map<String, Object>) value;
         List<String> innerMapFields = new ArrayList<>();
         for (Map.Entry<String, Object> innerEntry : new 
ArrayList<>(innerMap.entrySet())) {
           Object innerValue = innerEntry.getValue();
diff --git 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
index 97054e7174..0f97838517 100644
--- 
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
+++ 
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/recordtransformer/ExpressionTransformer.java
@@ -20,6 +20,7 @@ package org.apache.pinot.segment.local.recordtransformer;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
@@ -116,10 +117,13 @@ public class ExpressionTransformer implements 
RecordTransformer {
     for (Map.Entry<String, FunctionEvaluator> entry : 
_expressionEvaluators.entrySet()) {
       String column = entry.getKey();
       FunctionEvaluator transformFunctionEvaluator = entry.getValue();
-      // Skip transformation if column value already exist.
-      // NOTE: column value might already exist for OFFLINE data
-      if (record.getValue(column) == null) {
+      Object existingValue = record.getValue(column);
+      if (existingValue == null) {
         try {
+          // Skip transformation if column value already exists
+          // NOTE: column value might already exist for OFFLINE data,
+          // For backward compatibility, The only exception here is that we 
will override nested field like array,
+          // collection or map since they were not included in the record 
transformation before.
           record.putValue(column, transformFunctionEvaluator.evaluate(record));
         } catch (Exception e) {
           if (!_continueOnError) {
@@ -129,8 +133,33 @@ public class ExpressionTransformer implements 
RecordTransformer {
             record.putValue(GenericRow.INCOMPLETE_RECORD_KEY, true);
           }
         }
+      } else if (existingValue.getClass().isArray() || existingValue 
instanceof Collections
+          || existingValue instanceof Map) {
+        try {
+          Object transformedValue = 
transformFunctionEvaluator.evaluate(record);
+          // For backward compatibility, The only exception here is that we 
will override nested field like array,
+          // collection or map since they were not included in the record 
transformation before.
+          if (!isTypeCompatible(existingValue, transformedValue)) {
+            record.putValue(column, transformedValue);
+          }
+        } catch (Exception e) {
+          LOGGER.debug("Caught exception while evaluation transform function 
for column: {}", column, e);
+        }
       }
     }
     return record;
   }
+
+  private boolean isTypeCompatible(Object existingValue, Object 
transformedValue) {
+    if (transformedValue.getClass() == existingValue.getClass()) {
+      return true;
+    }
+    if (transformedValue instanceof Collections && existingValue instanceof 
Collections) {
+      return true;
+    }
+    if (transformedValue instanceof Map && existingValue instanceof Map) {
+      return true;
+    }
+    return transformedValue.getClass().isArray() && 
existingValue.getClass().isArray();
+  }
 }
diff --git 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java
 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java
index 543053b55b..dc24badb47 100644
--- 
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java
+++ 
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/recordtransformer/ComplexTypeTransformerTest.java
@@ -354,7 +354,7 @@ public class ComplexTypeTransformerTest {
     //   "array":"[1,2]"
     // }
     transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
-            ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new 
HashMap<>(), null);
+        ComplexTypeConfig.CollectionNotUnnestedToJson.ALL, new HashMap<>(), 
null);
     genericRow = new GenericRow();
     array = new Object[]{1, 2};
     genericRow.putValue("array", array);
@@ -400,7 +400,7 @@ public class ComplexTypeTransformerTest {
     map.put("array1", array1);
     genericRow.putValue("t", map);
     transformer = new ComplexTypeTransformer(Arrays.asList(), ".",
-            ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new 
HashMap<>(), null);
+        ComplexTypeConfig.CollectionNotUnnestedToJson.NONE, new HashMap<>(), 
null);
     transformer.transform(genericRow);
     
Assert.assertTrue(ComplexTypeTransformer.isNonPrimitiveArray(genericRow.getValue("t.array1")));
   }
@@ -411,7 +411,7 @@ public class ComplexTypeTransformerTest {
     prefixesToRename.put("map1.", "");
     prefixesToRename.put("map2", "test");
     ComplexTypeTransformer transformer = new ComplexTypeTransformer(new 
ArrayList<>(), ".",
-            DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
+        DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
 
     GenericRow genericRow = new GenericRow();
     genericRow.putValue("a", 1L);
@@ -426,7 +426,7 @@ public class ComplexTypeTransformerTest {
     prefixesToRename = new HashMap<>();
     prefixesToRename.put("test.", "");
     transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
-            DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
+        DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
     genericRow = new GenericRow();
     genericRow.putValue("a", 1L);
     genericRow.putValue("test.a", 2L);
@@ -441,7 +441,7 @@ public class ComplexTypeTransformerTest {
     prefixesToRename = new HashMap<>();
     prefixesToRename.put("test", "");
     transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
-            DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
+        DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
     genericRow = new GenericRow();
     genericRow.putValue("a", 1L);
     genericRow.putValue("test", 2L);
@@ -455,7 +455,7 @@ public class ComplexTypeTransformerTest {
     // case where nothing gets renamed
     prefixesToRename = new HashMap<>();
     transformer = new ComplexTypeTransformer(new ArrayList<>(), ".",
-            DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
+        DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
     genericRow = new GenericRow();
     genericRow.putValue("a", 1L);
     genericRow.putValue("test", 2L);
@@ -470,7 +470,7 @@ public class ComplexTypeTransformerTest {
     prefixesToRename.put("map1.", "");
     prefixesToRename.put("map2", "test");
     ComplexTypeTransformer transformer = new ComplexTypeTransformer(new 
ArrayList<>(), ".",
-            DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
+        DEFAULT_COLLECTION_TO_JSON_MODE, prefixesToRename, null);
 
     // test flatten root-level tuples
     GenericRow genericRow = new GenericRow();
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
index e4e46fa4ca..7361b6ed54 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/data/readers/GenericRow.java
@@ -155,6 +155,17 @@ public class GenericRow implements Serializable {
     return copy;
   }
 
+  /**
+   * @return a deep copy of the generic row for the given fields
+   */
+  public GenericRow copy(List<String> fieldsToCopy) {
+    GenericRow copy = new GenericRow();
+    for (String field : fieldsToCopy) {
+      copy.putValue(field, copy(getValue(field)));
+    }
+    return copy;
+  }
+
   /**
    * @return a deep copy of the object.
    */
@@ -174,6 +185,9 @@ public class GenericRow implements Serializable {
       }
       return list;
     } else if (value.getClass().isArray()) {
+      if (value instanceof byte[]) {
+        return ((byte[]) value).clone();
+      }
       Object[] array = new Object[((Object[]) value).length];
       int idx = 0;
       for (Object object : (Object[]) value) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to