abhishekrb19 commented on code in PR #15755:
URL: https://github.com/apache/druid/pull/15755#discussion_r1466773437


##########
docs/ingestion/input-sources.md:
##########
@@ -715,6 +715,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on 
hour, "countryName", and
  to `true` to enable a compatibility mode where the timestampSpec is ignored.
 :::
 
+The [secondary partitioning method](native-batch.md#partitionsspec) determines 
the requisite number of concurrent worker tasks that run in parallel to 
complete ingestion with the Combining input source.
+Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the 
secondary partitioning method:
+- `range` or `single_dim` partitioning: greater than or equal to 1
+- `hashed` or `dynamic` partitioning: greater than or equal to 2
+
+For more information on the `maxNumConcurrentSubTasks` field, see 
[Implementation considerations](native-batch.md#implementation-considerations).

Review Comment:
   Yeah, not directly related to this extension, but I noticed that this text 
was nested inside the Iceberg section: [Iceberg Filter 
Object](https://druid.apache.org/docs/latest/ingestion/input-sources#iceberg-filter-object)
 and seemed irrelevant. So I moved it to the Druid Input source, where it seems 
more relevant.



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+  private final io.delta.kernel.data.Row row;
+  private final StructType schema;
+  private final Object2IntMap<String> fieldNameToOrdinal = new 
Object2IntOpenHashMap<>();
+  private final InputRow delegateRow;
+
+  private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+  public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema 
inputRowSchema)
+  {
+    this.row = row;
+    this.schema = row.getSchema();
+    List<String> fieldNames = this.schema.fieldNames();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      fieldNameToOrdinal.put(fieldNames.get(i), i);
+    }
+    fieldNameToOrdinal.defaultReturnValue(-1);
+
+    Map<String, Object> theMap = new HashMap<>();
+    for (String fieldName : fieldNames) {
+      theMap.put(fieldName, _getRaw(fieldName));
+    }
+    delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+  }
+
+  @Override
+  public List<String> getDimensions()
+  {
+    return delegateRow.getDimensions();
+  }
+
+  @Override
+  public long getTimestampFromEpoch()
+  {
+    return delegateRow.getTimestampFromEpoch();
+  }
+
+  @Override
+  public DateTime getTimestamp()
+  {
+    return delegateRow.getTimestamp();
+  }
+
+  @Override
+  public List<String> getDimension(String dimension)
+  {
+    return delegateRow.getDimension(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Object getRaw(String dimension)
+  {
+    return delegateRow.getRaw(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Number getMetric(String metric)
+  {
+    return delegateRow.getMetric(metric);
+  }
+
+  @Override
+  public int compareTo(Row o)
+  {
+    return this.getTimestamp().compareTo(o.getTimestamp());
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeltaInputRow{" +
+           "row=" + row +
+           ", schema=" + schema +
+           ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+           ", delegateRow=" + delegateRow +
+           '}';
+  }
+
+  public Map<String, Object> getRawRowAsMap()
+  {
+    return RowSerde.convertRowToJsonObject(row);
+  }
+
+  @Nullable
+  private Object _getRaw(String dimension)
+  {
+    StructField field = schema.get(dimension);
+    if (field == null || field.isMetadataColumn()) {
+      return null;
+    }
+
+    int ordinal = fieldNameToOrdinal.getInt(dimension);
+    if (ordinal < 0) {
+      return null;
+    }
+    return getValue(field.getDataType(), row, ordinal);
+  }
+
+  @Nullable
+  private static Object getValue(DataType dataType, io.delta.kernel.data.Row 
dataRow, int columnOrdinal)
+  {
+    if (dataRow.isNullAt(columnOrdinal)) {
+      return null;
+    } else if (dataType instanceof BooleanType) {
+      return dataRow.getBoolean(columnOrdinal);
+    } else if (dataType instanceof ByteType) {
+      return dataRow.getByte(columnOrdinal);
+    } else if (dataType instanceof ShortType) {
+      return dataRow.getShort(columnOrdinal);
+    } else if (dataType instanceof IntegerType) {
+      return dataRow.getInt(columnOrdinal);
+    } else if (dataType instanceof DateType) {
+      // DateType data is stored internally as the number of days since 
1970-01-01
+      int daysSinceEpochUTC = dataRow.getInt(columnOrdinal);
+      return 
LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond();
+    } else if (dataType instanceof LongType) {
+      return dataRow.getLong(columnOrdinal);
+    } else if (dataType instanceof TimestampType) {
+      // TimestampType data is stored internally as the number of microseconds 
since epoch
+      long microSecsSinceEpochUTC = dataRow.getLong(columnOrdinal);
+      LocalDateTime dateTime = LocalDateTime.ofEpochSecond(
+          microSecsSinceEpochUTC / 1_000_000 /* epochSecond */,
+          (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */,
+          ZoneOffset.UTC
+      );
+      return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli();
+    } else if (dataType instanceof FloatType) {
+      return dataRow.getFloat(columnOrdinal);
+    } else if (dataType instanceof DoubleType) {
+      return dataRow.getDouble(columnOrdinal);
+    } else if (dataType instanceof StringType) {
+      return dataRow.getString(columnOrdinal);
+    } else if (dataType instanceof BinaryType) {
+      final byte[] arr = dataRow.getBinary(columnOrdinal);
+      final char[] charArray = new char[arr.length];
+      for (int i = 0; i < arr.length; i++) {
+        charArray[i] = (char) (arr[i] & 0xff);
+      }
+      return String.valueOf(charArray);
+    } else if (dataType instanceof DecimalType) {
+      return dataRow.getDecimal(columnOrdinal).longValue();
+    } else {
+      throw InvalidInput.exception("Unsupported data type[%s]", dataType);

Review Comment:
   added



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java:
##########
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+  private final io.delta.kernel.data.Row row;
+  private final StructType schema;
+  private final Object2IntMap<String> fieldNameToOrdinal = new 
Object2IntOpenHashMap<>();
+  private final InputRow delegateRow;
+
+  private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+  public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema 
inputRowSchema)
+  {
+    this.row = row;
+    this.schema = row.getSchema();
+    List<String> fieldNames = this.schema.fieldNames();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      fieldNameToOrdinal.put(fieldNames.get(i), i);
+    }
+    fieldNameToOrdinal.defaultReturnValue(-1);
+
+    Map<String, Object> theMap = new HashMap<>();
+    for (String fieldName : fieldNames) {
+      theMap.put(fieldName, _getRaw(fieldName));
+    }
+    delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+  }
+
+  @Override
+  public List<String> getDimensions()
+  {
+    return delegateRow.getDimensions();
+  }
+
+  @Override
+  public long getTimestampFromEpoch()
+  {
+    return delegateRow.getTimestampFromEpoch();
+  }
+
+  @Override
+  public DateTime getTimestamp()
+  {
+    return delegateRow.getTimestamp();
+  }
+
+  @Override
+  public List<String> getDimension(String dimension)
+  {
+    return delegateRow.getDimension(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Object getRaw(String dimension)
+  {
+    return delegateRow.getRaw(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Number getMetric(String metric)
+  {
+    return delegateRow.getMetric(metric);
+  }
+
+  @Override
+  public int compareTo(Row o)
+  {
+    return this.getTimestamp().compareTo(o.getTimestamp());
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeltaInputRow{" +
+           "row=" + row +
+           ", schema=" + schema +
+           ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+           ", delegateRow=" + delegateRow +
+           '}';
+  }
+
+  public Map<String, Object> getRawRowAsMap()
+  {
+    return RowSerde.convertRowToJsonObject(row);
+  }
+
+  @Nullable
+  private Object _getRaw(String dimension)
+  {
+    StructField field = schema.get(dimension);
+    if (field == null || field.isMetadataColumn()) {
+      return null;
+    }
+
+    int ordinal = fieldNameToOrdinal.getInt(dimension);
+    if (ordinal < 0) {
+      return null;
+    }
+    return getValue(field.getDataType(), row, ordinal);
+  }
+
+  @Nullable
+  private static Object getValue(DataType dataType, io.delta.kernel.data.Row 
dataRow, int columnOrdinal)
+  {
+    if (dataRow.isNullAt(columnOrdinal)) {
+      return null;
+    } else if (dataType instanceof BooleanType) {
+      return dataRow.getBoolean(columnOrdinal);
+    } else if (dataType instanceof ByteType) {
+      return dataRow.getByte(columnOrdinal);
+    } else if (dataType instanceof ShortType) {
+      return dataRow.getShort(columnOrdinal);
+    } else if (dataType instanceof IntegerType) {
+      return dataRow.getInt(columnOrdinal);
+    } else if (dataType instanceof DateType) {
+      // DateType data is stored internally as the number of days since 
1970-01-01
+      int daysSinceEpochUTC = dataRow.getInt(columnOrdinal);
+      return 
LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond();
+    } else if (dataType instanceof LongType) {
+      return dataRow.getLong(columnOrdinal);
+    } else if (dataType instanceof TimestampType) {
+      // TimestampType data is stored internally as the number of microseconds 
since epoch
+      long microSecsSinceEpochUTC = dataRow.getLong(columnOrdinal);
+      LocalDateTime dateTime = LocalDateTime.ofEpochSecond(
+          microSecsSinceEpochUTC / 1_000_000 /* epochSecond */,
+          (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */,
+          ZoneOffset.UTC
+      );
+      return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli();
+    } else if (dataType instanceof FloatType) {
+      return dataRow.getFloat(columnOrdinal);
+    } else if (dataType instanceof DoubleType) {
+      return dataRow.getDouble(columnOrdinal);
+    } else if (dataType instanceof StringType) {
+      return dataRow.getString(columnOrdinal);
+    } else if (dataType instanceof BinaryType) {
+      final byte[] arr = dataRow.getBinary(columnOrdinal);
+      final char[] charArray = new char[arr.length];
+      for (int i = 0; i < arr.length; i++) {
+        charArray[i] = (char) (arr[i] & 0xff);
+      }
+      return String.valueOf(charArray);

Review Comment:
   Good catch, I was trying to convert it to an unsigned value, which is not 
needed in this case



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.utils.Streams;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Input source to ingest data from a Delta Lake.
+ * This input source reads the latest snapshot from a Delta table specified by 
{@code tablePath} parameter.
+ * Note: the kernel table API only supports reading from the latest snapshot.
+ */
+public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
+{
+  public static final String TYPE_KEY = "delta";
+
+  @JsonProperty
+  private final String tablePath;
+
+  @JsonProperty
+  @Nullable
+  private final DeltaSplit deltaSplit;
+
+  @JsonCreator
+  public DeltaInputSource(
+      @JsonProperty("tablePath") String tablePath,
+      @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit
+  )
+  {
+    if (tablePath == null) {
+      throw InvalidInput.exception("tablePath cannot be null");
+    }
+    this.tablePath = tablePath;
+    this.deltaSplit = deltaSplit;
+  }
+
+  @Override
+  public boolean needsFormat()
+  {
+    // Only support Parquet
+    return false;
+  }
+
+  @Override
+  public InputSourceReader reader(
+      InputRowSchema inputRowSchema,
+      @Nullable InputFormat inputFormat,
+      File temporaryDirectory
+  )
+  {
+    Configuration hadoopConf = new Configuration();
+    TableClient tableClient = DefaultTableClient.create(hadoopConf);
+    try {
+      final Row scanState;
+      final List<Row> scanRowList;
+

Review Comment:
   Added javadocs to this function



##########
extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py:
##########
@@ -0,0 +1,89 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import argparse
+import delta
+import mimesis
+import pyspark
+
+
+def config_spark_with_delta_lake():

Review Comment:
   Will do. I might change this test setup a bit to get different datatype 
coverage, etc.



##########
extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java:
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.types.TableSchemaSerDe;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import org.apache.druid.error.InvalidInput;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ * Code borrowed from <a 
href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java";>

Review Comment:
   Updated, good point



##########
extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java:
##########
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+public class DeltaInputSourceTest
+{
+  @Test
+  public void testReadDeltaLakeFilesSample() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtil.DELTA_TABLE_PATH, null);
+    Assert.assertNotNull(deltaInputSource);

Review Comment:
   Heh, not sure, but I removed these unnecessary assert checks



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to