abhishekagarwal87 commented on code in PR #15755: URL: https://github.com/apache/druid/pull/15755#discussion_r1466201507
########## docs/ingestion/input-sources.md: ########## @@ -715,6 +715,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on hour, "countryName", and to `true` to enable a compatibility mode where the timestampSpec is ignored. ::: +The [secondary partitioning method](native-batch.md#partitionsspec) determines the requisite number of concurrent worker tasks that run in parallel to complete ingestion with the Combining input source. +Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the secondary partitioning method: +- `range` or `single_dim` partitioning: greater than or equal to 1 +- `hashed` or `dynamic` partitioning: greater than or equal to 2 + +For more information on the `maxNumConcurrentSubTasks` field, see [Implementation considerations](native-batch.md#implementation-considerations). Review Comment: seems unrelated to the PR ########## extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java: ########## @@ -0,0 +1,204 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.Iterators; +import com.google.common.primitives.Ints; +import io.delta.kernel.Scan; +import io.delta.kernel.Snapshot; +import io.delta.kernel.Table; +import io.delta.kernel.TableNotFoundException; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.FilteredColumnarBatch; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.client.DefaultTableClient; +import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.utils.CloseableIterator; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.error.InvalidInput; +import org.apache.druid.utils.Streams; +import org.apache.hadoop.conf.Configuration; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * Input source to ingest data from a Delta Lake. + * This input source reads the latest snapshot from a Delta table specified by {@code tablePath} parameter. + * Note: the kernel table API only supports reading from the latest snapshot. + */ +public class DeltaInputSource implements SplittableInputSource<DeltaSplit> +{ + public static final String TYPE_KEY = "delta"; + + @JsonProperty + private final String tablePath; + + @JsonProperty + @Nullable + private final DeltaSplit deltaSplit; + + @JsonCreator + public DeltaInputSource( + @JsonProperty("tablePath") String tablePath, + @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit + ) + { + if (tablePath == null) { + throw InvalidInput.exception("tablePath cannot be null"); + } + this.tablePath = tablePath; + this.deltaSplit = deltaSplit; + } + + @Override + public boolean needsFormat() + { + // Only support Parquet + return false; + } + + @Override + public InputSourceReader reader( + InputRowSchema inputRowSchema, + @Nullable InputFormat inputFormat, + File temporaryDirectory + ) + { + Configuration hadoopConf = new Configuration(); + TableClient tableClient = DefaultTableClient.create(hadoopConf); + try { + final Row scanState; + final List<Row> scanRowList; + Review Comment: can you add some comments describing the implementation? ########## extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.error.InvalidInput; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Encodes the row and schema information from the Delta Lake. + */ +public class DeltaInputRow implements InputRow +{ + private final io.delta.kernel.data.Row row; + private final StructType schema; + private final Object2IntMap<String> fieldNameToOrdinal = new Object2IntOpenHashMap<>(); + private final InputRow delegateRow; + + private static final ZoneId ZONE_ID = ZoneId.systemDefault(); + + public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema inputRowSchema) + { + this.row = row; + this.schema = row.getSchema(); + List<String> fieldNames = this.schema.fieldNames(); + for (int i = 0; i < fieldNames.size(); ++i) { + fieldNameToOrdinal.put(fieldNames.get(i), i); + } + fieldNameToOrdinal.defaultReturnValue(-1); + + Map<String, Object> theMap = new HashMap<>(); + for (String fieldName : fieldNames) { + theMap.put(fieldName, _getRaw(fieldName)); + } + delegateRow = MapInputRowParser.parse(inputRowSchema, theMap); + } + + @Override + public List<String> getDimensions() + { + return delegateRow.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegateRow.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegateRow.getTimestamp(); + } + + @Override + public List<String> getDimension(String dimension) + { + return delegateRow.getDimension(dimension); + } + + @Nullable + @Override + public Object getRaw(String dimension) + { + return delegateRow.getRaw(dimension); + } + + @Nullable + @Override + public Number getMetric(String metric) + { + return delegateRow.getMetric(metric); + } + + @Override + public int compareTo(Row o) + { + return this.getTimestamp().compareTo(o.getTimestamp()); + } + + @Override + public boolean equals(Object o) + { + return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0; + } + + @Override + public int hashCode() + { + return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow); + } + + @Override + public String toString() + { + return "DeltaInputRow{" + + "row=" + row + + ", schema=" + schema + + ", fieldNameToOrdinal=" + fieldNameToOrdinal + + ", delegateRow=" + delegateRow + + '}'; + } + + public Map<String, Object> getRawRowAsMap() + { + return RowSerde.convertRowToJsonObject(row); + } + + @Nullable + private Object _getRaw(String dimension) + { + StructField field = schema.get(dimension); + if (field == null || field.isMetadataColumn()) { + return null; + } + + int ordinal = fieldNameToOrdinal.getInt(dimension); + if (ordinal < 0) { + return null; + } + return getValue(field.getDataType(), row, ordinal); + } + + @Nullable + private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataRow, int columnOrdinal) + { + if (dataRow.isNullAt(columnOrdinal)) { + return null; + } else if (dataType instanceof BooleanType) { + return dataRow.getBoolean(columnOrdinal); + } else if (dataType instanceof ByteType) { + return dataRow.getByte(columnOrdinal); + } else if (dataType instanceof ShortType) { + return dataRow.getShort(columnOrdinal); + } else if (dataType instanceof IntegerType) { + return dataRow.getInt(columnOrdinal); + } else if (dataType instanceof DateType) { + // DateType data is stored internally as the number of days since 1970-01-01 + int daysSinceEpochUTC = dataRow.getInt(columnOrdinal); + return LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond(); + } else if (dataType instanceof LongType) { + return dataRow.getLong(columnOrdinal); + } else if (dataType instanceof TimestampType) { + // TimestampType data is stored internally as the number of microseconds since epoch + long microSecsSinceEpochUTC = dataRow.getLong(columnOrdinal); + LocalDateTime dateTime = LocalDateTime.ofEpochSecond( + microSecsSinceEpochUTC / 1_000_000 /* epochSecond */, + (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */, + ZoneOffset.UTC + ); + return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli(); + } else if (dataType instanceof FloatType) { + return dataRow.getFloat(columnOrdinal); + } else if (dataType instanceof DoubleType) { + return dataRow.getDouble(columnOrdinal); + } else if (dataType instanceof StringType) { + return dataRow.getString(columnOrdinal); + } else if (dataType instanceof BinaryType) { + final byte[] arr = dataRow.getBinary(columnOrdinal); + final char[] charArray = new char[arr.length]; + for (int i = 0; i < arr.length; i++) { + charArray[i] = (char) (arr[i] & 0xff); + } + return String.valueOf(charArray); + } else if (dataType instanceof DecimalType) { + return dataRow.getDecimal(columnOrdinal).longValue(); + } else { + throw InvalidInput.exception("Unsupported data type[%s]", dataType); Review Comment: we should fieldName to the exception message. ########## extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DeltaInputSourceTest +{ + @Test + public void testReadDeltaLakeFilesSample() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtil.DELTA_TABLE_PATH, null); + Assert.assertNotNull(deltaInputSource); + + InputSourceReader inputSourceReader = deltaInputSource.reader(DeltaTestUtil.SCHEMA, null, null); + Assert.assertNotNull(inputSourceReader); Review Comment: inputSourceReader can never be null. ########## extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java: ########## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import io.delta.kernel.types.BinaryType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DecimalType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import it.unimi.dsi.fastutil.objects.Object2IntMap; +import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap; +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.Row; +import org.apache.druid.data.input.impl.MapInputRowParser; +import org.apache.druid.error.InvalidInput; +import org.joda.time.DateTime; + +import javax.annotation.Nullable; +import java.time.LocalDate; +import java.time.LocalDateTime; +import java.time.ZoneId; +import java.time.ZoneOffset; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +/** + * Encodes the row and schema information from the Delta Lake. + */ +public class DeltaInputRow implements InputRow +{ + private final io.delta.kernel.data.Row row; + private final StructType schema; + private final Object2IntMap<String> fieldNameToOrdinal = new Object2IntOpenHashMap<>(); + private final InputRow delegateRow; + + private static final ZoneId ZONE_ID = ZoneId.systemDefault(); + + public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema inputRowSchema) + { + this.row = row; + this.schema = row.getSchema(); + List<String> fieldNames = this.schema.fieldNames(); + for (int i = 0; i < fieldNames.size(); ++i) { + fieldNameToOrdinal.put(fieldNames.get(i), i); + } + fieldNameToOrdinal.defaultReturnValue(-1); + + Map<String, Object> theMap = new HashMap<>(); + for (String fieldName : fieldNames) { + theMap.put(fieldName, _getRaw(fieldName)); + } + delegateRow = MapInputRowParser.parse(inputRowSchema, theMap); + } + + @Override + public List<String> getDimensions() + { + return delegateRow.getDimensions(); + } + + @Override + public long getTimestampFromEpoch() + { + return delegateRow.getTimestampFromEpoch(); + } + + @Override + public DateTime getTimestamp() + { + return delegateRow.getTimestamp(); + } + + @Override + public List<String> getDimension(String dimension) + { + return delegateRow.getDimension(dimension); + } + + @Nullable + @Override + public Object getRaw(String dimension) + { + return delegateRow.getRaw(dimension); + } + + @Nullable + @Override + public Number getMetric(String metric) + { + return delegateRow.getMetric(metric); + } + + @Override + public int compareTo(Row o) + { + return this.getTimestamp().compareTo(o.getTimestamp()); + } + + @Override + public boolean equals(Object o) + { + return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0; + } + + @Override + public int hashCode() + { + return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow); + } + + @Override + public String toString() + { + return "DeltaInputRow{" + + "row=" + row + + ", schema=" + schema + + ", fieldNameToOrdinal=" + fieldNameToOrdinal + + ", delegateRow=" + delegateRow + + '}'; + } + + public Map<String, Object> getRawRowAsMap() + { + return RowSerde.convertRowToJsonObject(row); + } + + @Nullable + private Object _getRaw(String dimension) + { + StructField field = schema.get(dimension); + if (field == null || field.isMetadataColumn()) { + return null; + } + + int ordinal = fieldNameToOrdinal.getInt(dimension); + if (ordinal < 0) { + return null; + } + return getValue(field.getDataType(), row, ordinal); + } + + @Nullable + private static Object getValue(DataType dataType, io.delta.kernel.data.Row dataRow, int columnOrdinal) + { + if (dataRow.isNullAt(columnOrdinal)) { + return null; + } else if (dataType instanceof BooleanType) { + return dataRow.getBoolean(columnOrdinal); + } else if (dataType instanceof ByteType) { + return dataRow.getByte(columnOrdinal); + } else if (dataType instanceof ShortType) { + return dataRow.getShort(columnOrdinal); + } else if (dataType instanceof IntegerType) { + return dataRow.getInt(columnOrdinal); + } else if (dataType instanceof DateType) { + // DateType data is stored internally as the number of days since 1970-01-01 + int daysSinceEpochUTC = dataRow.getInt(columnOrdinal); + return LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond(); + } else if (dataType instanceof LongType) { + return dataRow.getLong(columnOrdinal); + } else if (dataType instanceof TimestampType) { + // TimestampType data is stored internally as the number of microseconds since epoch + long microSecsSinceEpochUTC = dataRow.getLong(columnOrdinal); + LocalDateTime dateTime = LocalDateTime.ofEpochSecond( + microSecsSinceEpochUTC / 1_000_000 /* epochSecond */, + (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */, + ZoneOffset.UTC + ); + return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli(); + } else if (dataType instanceof FloatType) { + return dataRow.getFloat(columnOrdinal); + } else if (dataType instanceof DoubleType) { + return dataRow.getDouble(columnOrdinal); + } else if (dataType instanceof StringType) { + return dataRow.getString(columnOrdinal); + } else if (dataType instanceof BinaryType) { + final byte[] arr = dataRow.getBinary(columnOrdinal); + final char[] charArray = new char[arr.length]; + for (int i = 0; i < arr.length; i++) { + charArray[i] = (char) (arr[i] & 0xff); + } + return String.valueOf(charArray); Review Comment: can you describe the logic? why does a single byte need to be masked with `0xff`? ########## extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java: ########## @@ -0,0 +1,141 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import org.apache.druid.data.input.InputRow; +import org.apache.druid.data.input.InputRowListPlusRawValues; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.java.util.common.parsers.CloseableIterator; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +public class DeltaInputSourceTest +{ + @Test + public void testReadDeltaLakeFilesSample() throws IOException + { + final DeltaInputSource deltaInputSource = new DeltaInputSource(DeltaTestUtil.DELTA_TABLE_PATH, null); + Assert.assertNotNull(deltaInputSource); Review Comment: why? :) ########## extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py: ########## @@ -0,0 +1,89 @@ +#!/usr/bin/env python3 + +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os + +import argparse +import delta +import mimesis +import pyspark + + +def config_spark_with_delta_lake(): Review Comment: if this file is borrowed from some place, please acknowledge the source in comments. ########## extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java: ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.delta.input; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.node.ObjectNode; +import io.delta.kernel.client.TableClient; +import io.delta.kernel.data.Row; +import io.delta.kernel.defaults.internal.data.DefaultJsonRow; +import io.delta.kernel.internal.types.TableSchemaSerDe; +import io.delta.kernel.internal.util.VectorUtils; +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.BooleanType; +import io.delta.kernel.types.ByteType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.DateType; +import io.delta.kernel.types.DoubleType; +import io.delta.kernel.types.FloatType; +import io.delta.kernel.types.IntegerType; +import io.delta.kernel.types.LongType; +import io.delta.kernel.types.MapType; +import io.delta.kernel.types.ShortType; +import io.delta.kernel.types.StringType; +import io.delta.kernel.types.StructField; +import io.delta.kernel.types.StructType; +import io.delta.kernel.types.TimestampType; +import org.apache.druid.error.InvalidInput; + +import java.util.HashMap; +import java.util.Map; + +/** + * Utility class to serialize and deserialize {@link Row} object. + * Code borrowed from <a href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java"> Review Comment: can you add whats different here from the original class? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
