This is an automated email from the ASF dual-hosted git repository.

abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/29.0.0 by this push:
     new 8438d0fa658 [Backport] Extension to read and ingest Delta Lake tables 
(#15755) (#15812)
8438d0fa658 is described below

commit 8438d0fa65888e46ea109fea5311ec885493d63b
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Jan 31 01:37:46 2024 -0800

    [Backport] Extension to read and ingest Delta Lake tables (#15755) (#15812)
    
    First commit: clean backport of Extension to read and ingest Delta Lake 
tables #15755
    Second commit: change version from 30.0.0 to 29.0.0
---
 distribution/pom.xml                               |   4 +
 docs/development/extensions-contrib/delta-lake.md  |  44 +++
 docs/ingestion/input-sources.md                    |  39 ++-
 .../druid-deltalake-extensions/pom.xml             | 156 ++++++++++
 .../druid/delta/common/DeltaLakeDruidModule.java   |  70 +++++
 .../apache/druid/delta/input/DeltaInputRow.java    | 208 +++++++++++++
 .../apache/druid/delta/input/DeltaInputSource.java | 261 +++++++++++++++++
 .../druid/delta/input/DeltaInputSourceReader.java  | 138 +++++++++
 .../org/apache/druid/delta/input/DeltaSplit.java   |  71 +++++
 .../apache/druid/delta/input/DeltaTimeUtils.java   |  59 ++++
 .../org/apache/druid/delta/input/RowSerde.java     | 158 ++++++++++
 .../org.apache.druid.initialization.DruidModule    |  16 +
 .../druid/delta/input/DeltaInputRowTest.java       |  80 +++++
 .../druid/delta/input/DeltaInputSourceTest.java    | 256 ++++++++++++++++
 .../apache/druid/delta/input/DeltaTestUtils.java   | 322 +++++++++++++++++++++
 .../druid/delta/input/DeltaTimeUtilsTest.java      |  80 +++++
 .../org/apache/druid/delta/input/RowSerdeTest.java |  45 +++
 .../src/test/resources/README.md                   |  68 +++++
 .../src/test/resources/create_delta_table.py       | 122 ++++++++
 ...-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc | Bin 0 -> 16 bytes
 ...-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4189-8927-97fe1720df8d-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-43b7-87db-448c67a315df-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-444c-8984-6baecf6987ee-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 ...-4f80-bd63-e369c6335699-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
 .../_delta_log/.00000000000000000000.json.crc      | Bin 0 -> 72 bytes
 .../_delta_log/.00000000000000000001.json.crc      | Bin 0 -> 36 bytes
 .../_delta_log/00000000000000000000.json           |  13 +
 .../_delta_log/00000000000000000001.json           |   6 +
 ...0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet | Bin 0 -> 2316 bytes
 ...c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet | Bin 0 -> 979 bytes
 ...db98-40f2-9185-45237f51b9bf-c000.snappy.parquet | Bin 0 -> 2316 bytes
 ...a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet | Bin 0 -> 2301 bytes
 ...0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet | Bin 0 -> 2455 bytes
 ...8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet | Bin 0 -> 2317 bytes
 ...bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet | Bin 0 -> 2454 bytes
 ...54cb-43b7-87db-448c67a315df-c000.snappy.parquet | Bin 0 -> 2302 bytes
 ...c414-444c-8984-6baecf6987ee-c000.snappy.parquet | Bin 0 -> 2316 bytes
 ...5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet | Bin 0 -> 2302 bytes
 ...faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet | Bin 0 -> 2317 bytes
 ...e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet | Bin 0 -> 2455 bytes
 ...b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet | Bin 0 -> 2302 bytes
 ...2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet | Bin 0 -> 2455 bytes
 ...e177-4c02-b256-bc890fadce7e-c000.snappy.parquet | Bin 0 -> 2454 bytes
 ...aaec-4f80-bd63-e369c6335699-c000.snappy.parquet | Bin 0 -> 2324 bytes
 .../src/test/resources/requirements.txt            |   2 +
 pom.xml                                            |   2 +
 web-console/assets/delta.png                       | Bin 0 -> 6527 bytes
 .../druid-models/ingestion-spec/ingestion-spec.tsx |  20 +-
 .../src/druid-models/input-source/input-source.tsx |  36 ++-
 .../__snapshots__/load-data-view.spec.tsx.snap     |  14 +
 .../src/views/load-data-view/load-data-view.tsx    |  10 +
 .../input-source-step/input-source-info.tsx        |   3 +
 website/.spelling                                  |   9 +-
 64 files changed, 2303 insertions(+), 9 deletions(-)

diff --git a/distribution/pom.xml b/distribution/pom.xml
index 4376e28056e..0c14b913e3f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,8 @@
                                         
<argument>org.apache.druid.extensions:druid-kubernetes-extensions</argument>
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:druid-catalog</argument>
+                                        <argument>-c</argument>
+                                        
<argument>org.apache.druid.extensions:druid-deltalake-extensions</argument>
                                         
<argument>${druid.distribution.pulldeps.opts}</argument>
                                     </arguments>
                                 </configuration>
@@ -453,6 +455,8 @@
                                         <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions:druid-iceberg-extensions</argument>
                                         <argument>-c</argument>
+                                        
<argument>org.apache.druid.extensions:druid-deltalake-extensions</argument>
+                                        <argument>-c</argument>
                                         
<argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument>
                                     </arguments>
                                 </configuration>
diff --git a/docs/development/extensions-contrib/delta-lake.md 
b/docs/development/extensions-contrib/delta-lake.md
new file mode 100644
index 00000000000..8b1de484971
--- /dev/null
+++ b/docs/development/extensions-contrib/delta-lake.md
@@ -0,0 +1,44 @@
+---
+id: delta-lake
+title: "Delta Lake extension"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+## Delta Lake extension
+
+
+Delta Lake is an open source storage framework that enables building a
+Lakehouse architecture with various compute engines. 
[DeltaLakeInputSource](../../ingestion/input-sources.md#delta-lake-input-source)
 lets
+you ingest data stored in a Delta Lake table into Apache Druid. To use the 
Delta Lake extension, add the `druid-deltalake-extensions` to the list of 
loaded extensions.
+See [Loading extensions](../../configuration/extensions.md#loading-extensions) 
for more information.
+
+The Delta input source reads the configured Delta Lake table and extracts all 
the underlying delta files in the table's latest snapshot.
+These Delta Lake files are versioned Parquet files.
+
+## Version support
+
+The Delta Lake extension uses the Delta Kernel introduced in Delta Lake 3.0.0, 
which is compatible with Apache Spark 3.5.x.
+Older versions are unsupported, so consider upgrading to Delta Lake 3.0.x or 
higher to use this extension.
+
+## Known limitations
+
+- This extension relies on the Delta Kernel API and can only read from the 
latest Delta table snapshot.
+- Column filtering isn't supported. The extension reads all columns in the 
configured table.
\ No newline at end of file
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index 6e1be89c53d..d4693e7925a 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -715,6 +715,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on 
hour, "countryName", and
  to `true` to enable a compatibility mode where the timestampSpec is ignored.
 :::
 
+The [secondary partitioning method](native-batch.md#partitionsspec) determines 
the requisite number of concurrent worker tasks that run in parallel to 
complete ingestion with the Combining input source.
+Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the 
secondary partitioning method:
+- `range` or `single_dim` partitioning: greater than or equal to 1
+- `hashed` or `dynamic` partitioning: greater than or equal to 2
+
+For more information on the `maxNumConcurrentSubTasks` field, see 
[Implementation considerations](native-batch.md#implementation-considerations).
+
 ## SQL input source
 
 The SQL input source is used to read data directly from RDBMS.
@@ -817,7 +824,7 @@ The following is an example of a Combining input source 
spec:
 ## Iceberg input source
 
 :::info
- To use the Iceberg input source, add the `druid-iceberg-extensions` extension.
+To use the Iceberg input source, load the extension 
[`druid-iceberg-extensions`](../development/extensions-contrib/iceberg.md).
 :::
 
 You use the Iceberg input source to read data stored in the Iceberg table 
format. For a given table, the input source scans up to the latest Iceberg 
snapshot from the configured Hive catalog. Druid ingests the underlying live 
data files using the existing input source formats.
@@ -1006,11 +1013,31 @@ This input source provides the following filters: 
`and`, `equals`, `interval`, a
 |type|Set this value to `not`.|yes|
 |filter|The iceberg filter on which logical NOT is applied|yes|
 
+## Delta Lake input source
 
+:::info
+To use the Delta Lake input source, load the extension 
[`druid-deltalake-extensions`](../development/extensions-contrib/delta-lake.md).
+:::
 
-The [secondary partitioning method](native-batch.md#partitionsspec) determines 
the requisite number of concurrent worker tasks that run in parallel to 
complete ingestion with the Combining input source.
-Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the 
secondary partitioning method:
-- `range` or `single_dim` partitioning: greater than or equal to 1
-- `hashed` or `dynamic` partitioning: greater than or equal to 2
+You can use the Delta input source to read data stored in a Delta Lake table. 
For a given table, the input source scans
+the latest snapshot from the configured table. Druid ingests the underlying 
delta files from the table.
+
+The following is a sample spec:
+
+```json
+...
+    "ioConfig": {
+      "type": "index_parallel",
+      "inputSource": {
+        "type": "delta",
+        "tablePath": "/delta-table/directory"
+      },
+    }
+}
+```
+
+| Property|Description|Required|
+|---------|-----------|--------|
+| type|Set this value to `delta`.|yes|
+| tablePath|The location of the Delta table.|yes|
 
-For more information on the `maxNumConcurrentSubTasks` field, see 
[Implementation considerations](native-batch.md#implementation-considerations).
diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml 
b/extensions-contrib/druid-deltalake-extensions/pom.xml
new file mode 100644
index 00000000000..14020aa5c8a
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/pom.xml
@@ -0,0 +1,156 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-deltalake-extensions</artifactId>
+  <name>druid-deltalake-extensions</name>
+  <description>Delta Lake connector for Druid</description>
+
+  <parent>
+    <artifactId>druid</artifactId>
+    <groupId>org.apache.druid</groupId>
+    <version>29.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <properties>
+    <delta-kernel.version>3.0.0</delta-kernel.version>
+  </properties>
+
+  <dependencies>
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-kernel-api</artifactId>
+      <version>${delta-kernel.version}</version>
+    </dependency>
+
+    <dependency>
+      <groupId>io.delta</groupId>
+      <artifactId>delta-kernel-defaults</artifactId>
+      <version>${delta-kernel.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client-api</artifactId>
+      <version>${hadoop.compile.version}</version>
+      <scope>compile</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-processing</artifactId>
+      <version>${project.parent.version}</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <scope>provided</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.google.code.findbugs</groupId>
+      <artifactId>jsr305</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.google.inject</groupId>
+      <artifactId>guice</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-core</artifactId>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.12.7.1</version>
+    </dependency>
+    <dependency>
+      <groupId>it.unimi.dsi</groupId>
+      <artifactId>fastutil-core</artifactId>
+      <version>8.5.4</version>
+      <scope>provided</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-aws</artifactId>
+      <version>${hadoop.compile.version}</version>
+      <scope>runtime</scope>
+      <exclusions>
+        <exclusion>
+          <groupId>com.amazonaws</groupId>
+          <artifactId>aws-java-sdk-bundle</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.druid</groupId>
+      <artifactId>druid-processing</artifactId>
+      <version>${project.parent.version}</version>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-core</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.owasp</groupId>
+        <artifactId>dependency-check-maven</artifactId>
+        <configuration>
+          <skip>true</skip>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
new file mode 100644
index 00000000000..70c2b6ff655
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.common;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.delta.input.DeltaInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DeltaLakeDruidModule implements DruidModule
+{
+  @Override
+  public List<? extends Module> getJacksonModules()
+  {
+    return Collections.singletonList(
+        new SimpleModule("DeltaLakeDruidModule")
+            .registerSubtypes(
+                new NamedType(DeltaInputSource.class, 
DeltaInputSource.TYPE_KEY)
+            )
+    );
+  }
+
+  @Override
+  public void configure(Binder binder)
+  {
+    final Configuration conf = new Configuration();
+    conf.setClassLoader(getClass().getClassLoader());
+
+    ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+    try {
+      
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+      FileSystem.get(conf);
+    }
+    catch (Exception ex) {
+      throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+                          .ofCategory(DruidException.Category.UNCATEGORIZED)
+                          .build(ex, "Problem during fileSystem class level 
initialization");
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(currCtxCl);
+    }
+  }
+
+}
+
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
new file mode 100644
index 00000000000..acf909452a0
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+  private final io.delta.kernel.data.Row row;
+  private final StructType schema;
+  private final Object2IntMap<String> fieldNameToOrdinal = new 
Object2IntOpenHashMap<>();
+  private final InputRow delegateRow;
+
+  public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema 
inputRowSchema)
+  {
+    this.row = row;
+    this.schema = row.getSchema();
+    List<String> fieldNames = this.schema.fieldNames();
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      fieldNameToOrdinal.put(fieldNames.get(i), i);
+    }
+    fieldNameToOrdinal.defaultReturnValue(-1);
+
+    Map<String, Object> theMap = new HashMap<>();
+    for (String fieldName : fieldNames) {
+      theMap.put(fieldName, _getRaw(fieldName));
+    }
+    delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+  }
+
+  @Override
+  public List<String> getDimensions()
+  {
+    return delegateRow.getDimensions();
+  }
+
+  @Override
+  public long getTimestampFromEpoch()
+  {
+    return delegateRow.getTimestampFromEpoch();
+  }
+
+  @Override
+  public DateTime getTimestamp()
+  {
+    return delegateRow.getTimestamp();
+  }
+
+  @Override
+  public List<String> getDimension(String dimension)
+  {
+    return delegateRow.getDimension(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Object getRaw(String dimension)
+  {
+    return delegateRow.getRaw(dimension);
+  }
+
+  @Nullable
+  @Override
+  public Number getMetric(String metric)
+  {
+    return delegateRow.getMetric(metric);
+  }
+
+  @Override
+  public int compareTo(Row o)
+  {
+    return this.getTimestamp().compareTo(o.getTimestamp());
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeltaInputRow{" +
+           "row=" + row +
+           ", schema=" + schema +
+           ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+           ", delegateRow=" + delegateRow +
+           '}';
+  }
+
+  public Map<String, Object> getRawRowAsMap()
+  {
+    return RowSerde.convertRowToJsonObject(row);
+  }
+
+  @Nullable
+  private Object _getRaw(String dimension)
+  {
+    StructField field = schema.get(dimension);
+    if (field == null || field.isMetadataColumn()) {
+      return null;
+    }
+
+    int ordinal = fieldNameToOrdinal.getInt(dimension);
+    if (ordinal < 0) {
+      return null;
+    }
+    return getValue(field.getDataType(), row, ordinal);
+  }
+
+  @Nullable
+  private static Object getValue(DataType dataType, io.delta.kernel.data.Row 
dataRow, int columnOrdinal)
+  {
+    if (dataRow.isNullAt(columnOrdinal)) {
+      return null;
+    } else if (dataType instanceof BooleanType) {
+      return dataRow.getBoolean(columnOrdinal);
+    } else if (dataType instanceof ByteType) {
+      return dataRow.getByte(columnOrdinal);
+    } else if (dataType instanceof ShortType) {
+      return dataRow.getShort(columnOrdinal);
+    } else if (dataType instanceof IntegerType) {
+      return dataRow.getInt(columnOrdinal);
+    } else if (dataType instanceof DateType) {
+      return DeltaTimeUtils.getSecondsFromDate(dataRow.getInt(columnOrdinal));
+    } else if (dataType instanceof LongType) {
+      return dataRow.getLong(columnOrdinal);
+    } else if (dataType instanceof TimestampType) {
+      return 
DeltaTimeUtils.getMillisFromTimestamp(dataRow.getLong(columnOrdinal));
+    } else if (dataType instanceof FloatType) {
+      return dataRow.getFloat(columnOrdinal);
+    } else if (dataType instanceof DoubleType) {
+      return dataRow.getDouble(columnOrdinal);
+    } else if (dataType instanceof StringType) {
+      return dataRow.getString(columnOrdinal);
+    } else if (dataType instanceof BinaryType) {
+      final byte[] arr = dataRow.getBinary(columnOrdinal);
+      final char[] charArray = new char[arr.length];
+      for (int i = 0; i < arr.length; i++) {
+        charArray[i] = (char) (arr[i]);
+      }
+      return String.valueOf(charArray);
+    } else if (dataType instanceof DecimalType) {
+      return dataRow.getDecimal(columnOrdinal).longValue();
+    } else {
+      throw InvalidInput.exception(
+          "Unsupported data type[%s] for fieldName[%s].",
+          dataType,
+          dataRow.getSchema().fieldNames().get(columnOrdinal)
+      );
+    }
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
new file mode 100644
index 00000000000..936e2dd70d6
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.utils.Streams;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Input source to ingest data from a Delta Lake.
+ * This input source reads the latest snapshot from a Delta table specified by 
{@code tablePath} parameter.
+ * We leverage the Delta Kernel APIs to interact with a Delta table. The 
Kernel API abstracts away the
+ * complexities of the Delta protocol itself.
+ * Note: currently, the Kernel table API only supports reading from the latest 
snapshot.
+ */
+public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
+{
+  public static final String TYPE_KEY = "delta";
+
+  @JsonProperty
+  private final String tablePath;
+
+  @JsonProperty
+  @Nullable
+  private final DeltaSplit deltaSplit;
+
+  @JsonCreator
+  public DeltaInputSource(
+      @JsonProperty("tablePath") String tablePath,
+      @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit
+  )
+  {
+    if (tablePath == null) {
+      throw InvalidInput.exception("tablePath cannot be null.");
+    }
+    this.tablePath = tablePath;
+    this.deltaSplit = deltaSplit;
+  }
+
+  @Override
+  public boolean needsFormat()
+  {
+    // Only support Parquet
+    return false;
+  }
+
+  /**
+   * Instantiates a {@link DeltaInputSourceReader} to read the Delta table 
rows. If a {@link DeltaSplit} is supplied,
+   * the Delta files and schema are obtained from it to instantiate the 
reader. Otherwise, a Delta table client is
+   * instantiated with the supplied configuration to read the table.
+   *
+   * @param inputRowSchema     schema for {@link 
org.apache.druid.data.input.InputRow}
+   * @param inputFormat        unused parameter. The input format is always 
parquet
+   * @param temporaryDirectory unused parameter
+   */
+  @Override
+  public InputSourceReader reader(
+      InputRowSchema inputRowSchema,
+      @Nullable InputFormat inputFormat,
+      File temporaryDirectory
+  )
+  {
+    final TableClient tableClient = createTableClient();
+    try {
+      final Row scanState;
+      final List<Row> scanRowList;
+
+      if (deltaSplit != null) {
+        scanState = deserialize(tableClient, deltaSplit.getStateRow());
+        scanRowList = deltaSplit.getFiles()
+                                .stream()
+                                .map(row -> deserialize(tableClient, row))
+                                .collect(Collectors.toList());
+      } else {
+        final Table table = Table.forPath(tableClient, tablePath);
+        final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
+        final StructType prunedSchema = pruneSchema(
+            latestSnapshot.getSchema(tableClient),
+            inputRowSchema.getColumnsFilter()
+        );
+        final Scan scan = 
latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient, 
prunedSchema).build();
+        final CloseableIterator<FilteredColumnarBatch> scanFiles = 
scan.getScanFiles(tableClient);
+
+        scanState = scan.getScanState(tableClient);
+        scanRowList = new ArrayList<>();
+
+        while (scanFiles.hasNext()) {
+          final FilteredColumnarBatch scanFileBatch = scanFiles.next();
+          final CloseableIterator<Row> scanFileRows = scanFileBatch.getRows();
+          scanFileRows.forEachRemaining(scanRowList::add);
+        }
+      }
+      return new DeltaInputSourceReader(
+          Scan.readData(
+              tableClient,
+              scanState,
+              Utils.toCloseableIterator(scanRowList.iterator()),
+              Optional.empty()
+          ),
+          inputRowSchema
+      );
+    }
+    catch (TableNotFoundException e) {
+      throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+    }
+    catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat, 
@Nullable SplitHintSpec splitHintSpec)
+  {
+    if (deltaSplit != null) {
+      // can't split a split
+      return Stream.of(new InputSplit<>(deltaSplit));
+    }
+
+    final TableClient tableClient = createTableClient();
+    final Snapshot latestSnapshot;
+    try {
+      final Table table = Table.forPath(tableClient, tablePath);
+      latestSnapshot = table.getLatestSnapshot(tableClient);
+    }
+    catch (TableNotFoundException e) {
+      throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+    }
+    final Scan scan = latestSnapshot.getScanBuilder(tableClient).build();
+    // scan files iterator for the current snapshot
+    final CloseableIterator<FilteredColumnarBatch> scanFilesIterator = 
scan.getScanFiles(tableClient);
+
+    final Row scanState = scan.getScanState(tableClient);
+    final String scanStateStr = RowSerde.serializeRowToJson(scanState);
+
+    Iterator<DeltaSplit> deltaSplitIterator = Iterators.transform(
+        scanFilesIterator,
+        scanFile -> {
+          final CloseableIterator<Row> rows = scanFile.getRows();
+          final List<String> fileRows = new ArrayList<>();
+          while (rows.hasNext()) {
+            fileRows.add(RowSerde.serializeRowToJson(rows.next()));
+          }
+          return new DeltaSplit(scanStateStr, fileRows);
+        }
+    );
+
+    return 
Streams.sequentialStreamFrom(deltaSplitIterator).map(InputSplit::new);
+  }
+
+  @Override
+  public int estimateNumSplits(InputFormat inputFormat, @Nullable 
SplitHintSpec splitHintSpec)
+  {
+    return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count());
+  }
+
+  @Override
+  public InputSource withSplit(InputSplit<DeltaSplit> split)
+  {
+    return new DeltaInputSource(
+        tablePath,
+        split.get()
+    );
+  }
+
+  private Row deserialize(TableClient tableClient, String row)
+  {
+    return RowSerde.deserializeRowFromJson(tableClient, row);
+  }
+
+  /**
+   * Utility method to return a pruned schema that contains the given {@code 
columns} from
+   * {@code baseSchema} applied by {@code columnsFilter}. This will serve as 
an optimization
+   * for table scans if we're interested in reading only a subset of columns 
from the Delta Lake table.
+   */
+  private StructType pruneSchema(final StructType baseSchema, final 
ColumnsFilter columnsFilter)
+  {
+    final List<String> columnNames = baseSchema.fieldNames();
+    final List<String> fiteredColumnNames = columnNames
+        .stream()
+        .filter(columnsFilter::apply)
+        .collect(Collectors.toList());
+
+    if (fiteredColumnNames.equals(columnNames)) {
+      return baseSchema;
+    }
+    final List<StructField> selectedFields = fiteredColumnNames
+        .stream()
+        .map(baseSchema::get)
+        .collect(Collectors.toList());
+    return new StructType(selectedFields);
+  }
+
+  /**
+   * @return a table client where the client is initialized with {@link 
Configuration} class that uses the class's
+   * class loader instead of the context classloader. The latter by default 
doesn't know about the extension classes,
+   * so the table client cannot load runtime classes resulting in {@link 
ClassNotFoundException}.
+   */
+  private TableClient createTableClient()
+  {
+    final ClassLoader currCtxClassloader = 
Thread.currentThread().getContextClassLoader();
+    try {
+      
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+      final Configuration conf = new Configuration();
+      return DefaultTableClient.create(conf);
+    }
+    finally {
+      Thread.currentThread().setContextClassLoader(currCtxClassloader);
+    }
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
new file mode 100644
index 00000000000..d0fc4780d00
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * A reader for the Delta Lake input source. It initializes an iterator {@link 
DeltaInputSourceIterator}
+ * for a subset of Delta records given by {@link FilteredColumnarBatch} and 
schema {@link InputRowSchema}.
+ *
+ */
+public class DeltaInputSourceReader implements InputSourceReader
+{
+  private final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredColumnarBatchCloseableIterator;
+  private final InputRowSchema inputRowSchema;
+
+  public DeltaInputSourceReader(
+      io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredColumnarBatchCloseableIterator,
+      InputRowSchema inputRowSchema
+  )
+  {
+    this.filteredColumnarBatchCloseableIterator = 
filteredColumnarBatchCloseableIterator;
+    this.inputRowSchema = inputRowSchema;
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read()
+  {
+    return new 
DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, 
inputRowSchema);
+  }
+
+  @Override
+  public CloseableIterator<InputRow> read(InputStats inputStats)
+  {
+    return new 
DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator, 
inputRowSchema);
+  }
+
+  @Override
+  public CloseableIterator<InputRowListPlusRawValues> sample()
+  {
+
+    CloseableIterator<InputRow> inner = read();
+    return new CloseableIterator<InputRowListPlusRawValues>()
+    {
+      @Override
+      public void close() throws IOException
+      {
+        inner.close();
+      }
+
+      @Override
+      public boolean hasNext()
+      {
+        return inner.hasNext();
+      }
+
+      @Override
+      public InputRowListPlusRawValues next()
+      {
+        DeltaInputRow deltaInputRow = (DeltaInputRow) inner.next();
+        return InputRowListPlusRawValues.of(deltaInputRow, 
deltaInputRow.getRawRowAsMap());
+      }
+    };
+  }
+
+  private static class DeltaInputSourceIterator implements 
CloseableIterator<InputRow>
+  {
+    private final 
io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredColumnarBatchCloseableIterator;
+
+    private io.delta.kernel.utils.CloseableIterator<Row> currentBatch = null;
+    private final InputRowSchema inputRowSchema;
+
+    public DeltaInputSourceIterator(
+        io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch> 
filteredColumnarBatchCloseableIterator,
+        InputRowSchema inputRowSchema
+    )
+    {
+      this.filteredColumnarBatchCloseableIterator = 
filteredColumnarBatchCloseableIterator;
+      this.inputRowSchema = inputRowSchema;
+    }
+
+    @Override
+    public boolean hasNext()
+    {
+      while (currentBatch == null || !currentBatch.hasNext()) {
+        if (!filteredColumnarBatchCloseableIterator.hasNext()) {
+          return false; // No more batches or records to read!
+        }
+        currentBatch = filteredColumnarBatchCloseableIterator.next().getRows();
+      }
+      return true;
+    }
+
+    @Override
+    public InputRow next()
+    {
+      if (!hasNext()) {
+        throw new NoSuchElementException();
+      }
+
+      Row dataRow = currentBatch.next();
+      return new DeltaInputRow(dataRow, inputRowSchema);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+      filteredColumnarBatchCloseableIterator.close();
+    }
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
new file mode 100644
index 00000000000..7ab52cef089
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * An input split of a Delta table containing the following information:
+ * <li>
+ * {@code stateRow} represents the canonical json representation of the latest 
snapshot of the Delta table.
+ * </li>
+ * <li>
+ * {@code files} represents the list of files from the latest snapshot.
+ * </li>
+ */
+public class DeltaSplit
+{
+  private final String stateRow;
+  private final List<String> files;
+
+  @JsonCreator
+  public DeltaSplit(
+      @JsonProperty("state") final String stateRow,
+      @JsonProperty("files") final List<String> files
+  )
+  {
+    this.stateRow = stateRow;
+    this.files = files;
+  }
+
+  @JsonProperty("state")
+  public String getStateRow()
+  {
+    return stateRow;
+  }
+
+  @JsonProperty("files")
+  public List<String> getFiles()
+  {
+    return files;
+  }
+
+  @Override
+  public String toString()
+  {
+    return "DeltaSplit{" +
+           "stateRow=" + stateRow +
+           ", files=" + files +
+           "}";
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
new file mode 100644
index 00000000000..f4742894402
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+public class DeltaTimeUtils
+{
+  private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+  /**
+   * {@link io.delta.kernel.types.TimestampType} data in Delta Lake tables is 
stored internally as the number of
+   * microseconds since epoch.
+   *
+   * @param microSecsSinceEpochUTC microseconds since epoch
+   * @return Datetime millis correpsonding to {@code microSecsSinceEpochUTC}
+   */
+  public static long getMillisFromTimestamp(final long microSecsSinceEpochUTC)
+  {
+    final LocalDateTime dateTime = LocalDateTime.ofEpochSecond(
+        microSecsSinceEpochUTC / 1_000_000 /* epochSecond */,
+        (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */,
+        ZoneOffset.UTC
+    );
+    return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli();
+  }
+
+  /**
+   * {@link io.delta.kernel.types.DateType} data in Delta Lake tables is 
stored internally as the number of
+   * days since epoch.
+   *
+   * @param daysSinceEpochUTC number of days since epoch
+   * @return number of seconds corresponding to {@code daysSinceEpochUTC}.
+   */
+  public static long getSecondsFromDate(final int daysSinceEpochUTC)
+  {
+    return 
LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond();
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
new file mode 100644
index 00000000000..f10ac0574e9
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.types.TableSchemaSerDe;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.jackson.DefaultObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ * Code borrowed from <a 
href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java";>
+ * RowSerde.java</a>. The only differences between the two classes are the 
code style and exception handling in
+ * {@link #convertRowToJsonObject}, where we use {@link 
org.apache.druid.error.DruidException} instead of
+ * {@link UnsupportedOperationException}.
+ *
+ */
+public class RowSerde
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+  private RowSerde()
+  {
+  }
+
+  /**
+   * Utility method to serialize a {@link Row} as a JSON string
+   */
+  public static String serializeRowToJson(Row row)
+  {
+    Map<String, Object> rowObject = convertRowToJsonObject(row);
+    try {
+      Map<String, Object> rowWithSchema = new HashMap<>();
+      rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema()));
+      rowWithSchema.put("row", rowObject);
+      return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  /**
+   * Utility method to deserialize a {@link Row} object from the JSON form.
+   */
+  public static Row deserializeRowFromJson(TableClient tableClient, String 
jsonRowWithSchema)
+  {
+    try {
+      JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+      JsonNode schemaNode = jsonNode.get("schema");
+      StructType schema =
+          TableSchemaSerDe.fromJson(tableClient.getJsonHandler(), 
schemaNode.asText());
+      return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"), 
schema);
+    }
+    catch (JsonProcessingException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  public static Map<String, Object> convertRowToJsonObject(Row row)
+  {
+    StructType rowType = row.getSchema();
+    Map<String, Object> rowObject = new HashMap<>();
+    for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+      StructField field = rowType.at(fieldId);
+      DataType fieldType = field.getDataType();
+      String name = field.getName();
+
+      if (row.isNullAt(fieldId)) {
+        rowObject.put(name, null);
+        continue;
+      }
+
+      Object value;
+      if (fieldType instanceof BooleanType) {
+        value = row.getBoolean(fieldId);
+      } else if (fieldType instanceof ByteType) {
+        value = row.getByte(fieldId);
+      } else if (fieldType instanceof ShortType) {
+        value = row.getShort(fieldId);
+      } else if (fieldType instanceof IntegerType) {
+        value = row.getInt(fieldId);
+      } else if (fieldType instanceof LongType) {
+        value = row.getLong(fieldId);
+      } else if (fieldType instanceof FloatType) {
+        value = row.getFloat(fieldId);
+      } else if (fieldType instanceof DoubleType) {
+        value = row.getDouble(fieldId);
+      } else if (fieldType instanceof DateType) {
+        value = DeltaTimeUtils.getSecondsFromDate(row.getInt(fieldId));
+      } else if (fieldType instanceof TimestampType) {
+        value = DeltaTimeUtils.getMillisFromTimestamp(row.getLong(fieldId));
+      } else if (fieldType instanceof StringType) {
+        value = row.getString(fieldId);
+      } else if (fieldType instanceof ArrayType) {
+        value = VectorUtils.toJavaList(row.getArray(fieldId));
+      } else if (fieldType instanceof MapType) {
+        value = VectorUtils.toJavaMap(row.getMap(fieldId));
+      } else if (fieldType instanceof StructType) {
+        Row subRow = row.getStruct(fieldId);
+        value = convertRowToJsonObject(subRow);
+      } else {
+        throw InvalidInput.exception("Unsupported fieldType[%s] for 
fieldName[%s]", fieldType, name);
+      }
+
+      rowObject.put(name, value);
+    }
+
+    return rowObject;
+  }
+
+  private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode, 
StructType rowType)
+  {
+    return new DefaultJsonRow(rowJsonNode, rowType);
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
 
b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 00000000000..b2752bd862c
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.delta.common.DeltaLakeDruidModule
\ No newline at end of file
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
new file mode 100644
index 00000000000..6f68774dbd6
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class DeltaInputRowTest
+{
+  @Test
+  public void testDeltaInputRow() throws TableNotFoundException, IOException
+  {
+    final TableClient tableClient = DefaultTableClient.create(new 
Configuration());
+    final Scan scan = DeltaTestUtils.getScan(tableClient);
+
+    CloseableIterator<FilteredColumnarBatch> scanFileIter = 
scan.getScanFiles(tableClient);
+    int totalRecordCount = 0;
+    while (scanFileIter.hasNext()) {
+      try (CloseableIterator<FilteredColumnarBatch> data =
+               Scan.readData(
+                   tableClient,
+                   scan.getScanState(tableClient),
+                   scanFileIter.next().getRows(),
+                   Optional.empty()
+               )) {
+        while (data.hasNext()) {
+          FilteredColumnarBatch dataReadResult = data.next();
+          Row next = dataReadResult.getRows().next();
+          DeltaInputRow deltaInputRow = new DeltaInputRow(
+              next,
+              DeltaTestUtils.FULL_SCHEMA
+          );
+          Assert.assertNotNull(deltaInputRow);
+          Assert.assertEquals(DeltaTestUtils.DIMENSIONS, 
deltaInputRow.getDimensions());
+
+          Map<String, Object> expectedRow = 
DeltaTestUtils.EXPECTED_ROWS.get(totalRecordCount);
+          for (String key : expectedRow.keySet()) {
+            if 
(DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key))
 {
+              final long expectedMillis = ((Long) expectedRow.get(key)) * 1000;
+              Assert.assertEquals(expectedMillis, 
deltaInputRow.getTimestampFromEpoch());
+            } else {
+              Assert.assertEquals(expectedRow.get(key), 
deltaInputRow.getRaw(key));
+            }
+          }
+          totalRecordCount += 1;
+        }
+      }
+    }
+    Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), totalRecordCount);
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
new file mode 100644
index 00000000000..24b1096abe1
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DeltaInputSourceTest
+{
+  @Before
+  public void setUp()
+  {
+    System.setProperty("user.timezone", "UTC");
+  }
+
+  @Test
+  public void testSampleDeltaTable() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final InputSourceReader inputSourceReader = 
deltaInputSource.reader(DeltaTestUtils.FULL_SCHEMA, null, null);
+
+    List<InputRowListPlusRawValues> actualSampledRows = 
sampleAllRows(inputSourceReader);
+    Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), 
actualSampledRows.size());
+
+    for (int idx = 0; idx < DeltaTestUtils.EXPECTED_ROWS.size(); idx++) {
+      Map<String, Object> expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(idx);
+      InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
+      Assert.assertNull(actualSampledRow.getParseException());
+
+      Map<String, Object> actualSampledRawVals = 
actualSampledRow.getRawValues();
+      Assert.assertNotNull(actualSampledRawVals);
+      Assert.assertNotNull(actualSampledRow.getRawValuesList());
+      Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
+
+      for (String key : expectedRow.keySet()) {
+        if 
(DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key))
 {
+          final long expectedMillis = (Long) expectedRow.get(key);
+          Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
+        } else {
+          Assert.assertEquals(expectedRow.get(key), 
actualSampledRawVals.get(key));
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testReadAllDeltaTable() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final InputSourceReader inputSourceReader = deltaInputSource.reader(
+        DeltaTestUtils.FULL_SCHEMA,
+        null,
+        null
+    );
+    final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+    validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, 
DeltaTestUtils.FULL_SCHEMA);
+  }
+
+  @Test
+  public void testReadAllDeltaTableSubSchema1() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final InputSourceReader inputSourceReader = deltaInputSource.reader(
+        DeltaTestUtils.SCHEMA_1,
+        null,
+        null
+    );
+    final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+    validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, 
DeltaTestUtils.SCHEMA_1);
+  }
+
+  @Test
+  public void testReadAllDeltaTableWithSubSchema2() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final InputSourceReader inputSourceReader = deltaInputSource.reader(
+        DeltaTestUtils.SCHEMA_2,
+        null,
+        null
+    );
+    final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+    validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows, 
DeltaTestUtils.SCHEMA_2);
+  }
+
+  @Test
+  public void testDeltaLakeWithCreateSplits()
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final List<InputSplit<DeltaSplit>> splits = 
deltaInputSource.createSplits(null, null)
+                                                                
.collect(Collectors.toList());
+    Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), 
splits.size());
+
+    for (InputSplit<DeltaSplit> split : splits) {
+      final DeltaSplit deltaSplit = split.get();
+      final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+          DeltaTestUtils.DELTA_TABLE_PATH,
+          deltaSplit
+      );
+      List<InputSplit<DeltaSplit>> splitsResult = 
deltaInputSourceWithSplit.createSplits(null, null)
+                                                                           
.collect(Collectors.toList());
+      Assert.assertEquals(1, splitsResult.size());
+      Assert.assertEquals(deltaSplit, splitsResult.get(0).get());
+    }
+  }
+
+  @Test
+  public void testDeltaLakeWithReadSplits() throws IOException
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+    final List<InputSplit<DeltaSplit>> splits = 
deltaInputSource.createSplits(null, null)
+                                                                
.collect(Collectors.toList());
+    Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(), 
splits.size());
+
+    for (int idx = 0; idx < splits.size(); idx++) {
+      final InputSplit<DeltaSplit> split = splits.get(idx);
+      final DeltaSplit deltaSplit = split.get();
+      final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+          DeltaTestUtils.DELTA_TABLE_PATH,
+          deltaSplit
+      );
+      final InputSourceReader inputSourceReader = 
deltaInputSourceWithSplit.reader(
+          DeltaTestUtils.FULL_SCHEMA,
+          null,
+          null
+      );
+      final List<InputRow> actualRowsInSplit = readAllRows(inputSourceReader);
+      final List<Map<String, Object>> expectedRowsInSplit = 
DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.get(idx);
+      validateRows(expectedRowsInSplit, actualRowsInSplit, 
DeltaTestUtils.FULL_SCHEMA);
+    }
+  }
+
+  @Test
+  public void testNullTable()
+  {
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> new DeltaInputSource(null, null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "tablePath cannot be null."
+        )
+    );
+  }
+
+  @Test
+  public void testSplitNonExistentTable()
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource("non-existent-table", null);
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> deltaInputSource.createSplits(null, null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "tablePath[non-existent-table] not found."
+        )
+    );
+  }
+
+  @Test
+  public void testReadNonExistentTable()
+  {
+    final DeltaInputSource deltaInputSource = new 
DeltaInputSource("non-existent-table", null);
+
+    MatcherAssert.assertThat(
+        Assert.assertThrows(
+            DruidException.class,
+            () -> deltaInputSource.reader(null, null, null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageIs(
+            "tablePath[non-existent-table] not found."
+        )
+    );
+  }
+
+  private List<InputRowListPlusRawValues> sampleAllRows(InputSourceReader 
reader) throws IOException
+  {
+    List<InputRowListPlusRawValues> rows = new ArrayList<>();
+    try (CloseableIterator<InputRowListPlusRawValues> iterator = 
reader.sample()) {
+      iterator.forEachRemaining(rows::add);
+    }
+    return rows;
+  }
+
+  private List<InputRow> readAllRows(InputSourceReader reader) throws 
IOException
+  {
+    final List<InputRow> rows = new ArrayList<>();
+    try (CloseableIterator<InputRow> iterator = reader.read()) {
+      iterator.forEachRemaining(rows::add);
+    }
+    return rows;
+  }
+
+  private void validateRows(
+      final List<Map<String, Object>> expectedRows,
+      final List<InputRow> actualReadRows,
+      final InputRowSchema schema
+  )
+  {
+    Assert.assertEquals(expectedRows.size(), actualReadRows.size());
+
+    for (int idx = 0; idx < expectedRows.size(); idx++) {
+      final Map<String, Object> expectedRow = expectedRows.get(idx);
+      final InputRow actualInputRow = actualReadRows.get(idx);
+      for (String key : expectedRow.keySet()) {
+        if (!schema.getColumnsFilter().apply(key)) {
+          Assert.assertNull(actualInputRow.getRaw(key));
+        } else {
+          if (schema.getTimestampSpec().getTimestampColumn().equals(key)) {
+            final long expectedMillis = (Long) expectedRow.get(key) * 1000;
+            Assert.assertEquals(expectedMillis, 
actualInputRow.getTimestampFromEpoch());
+            Assert.assertEquals(DateTimes.utc(expectedMillis), 
actualInputRow.getTimestamp());
+          } else {
+            Assert.assertEquals(expectedRow.get(key), 
actualInputRow.getRaw(key));
+          }
+        }
+      }
+    }
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
new file mode 100644
index 00000000000..180adaefcfb
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.delta.kernel.Scan;
+import io.delta.kernel.ScanBuilder;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.types.StructType;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Refer to 
extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to 
generate the
+ * sample Delta Lake table used in the unit tests.
+ */
+public class DeltaTestUtils
+{
+  /**
+   * The Delta table path used by unit tests.
+   */
+  public static final String DELTA_TABLE_PATH = 
"src/test/resources/employee-delta-table";
+  /**
+   * The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
+   */
+  public static final List<String> DIMENSIONS = ImmutableList.of(
+      "id",
+      "birthday",
+      "name",
+      "age",
+      "salary",
+      "bonus",
+      "yoe",
+      "is_fulltime",
+      "last_vacation_time"
+  );
+
+  /**
+   * The expected set of rows from the first checkpoint file {@code 
DELTA_TABLE_PATH/_delta_log/00000000000000000000.json}
+   */
+  private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new 
ArrayList<>(
+      ImmutableList.of(
+          ImmutableMap.of(
+              "birthday", 1057881600L,
+              "name", "Employee1",
+              "id", 867799346L,
+              "salary", 87642.55209817083,
+              "age", (short) 20,
+              "yoe", 4
+          ),
+          ImmutableMap.of(
+              "birthday", 1035417600L,
+              "is_fulltime", false,
+              "name", "Employee2",
+              "id", 9963151889L,
+              "salary", 79404.63969727767,
+              "age", (short) 21,
+              "yoe", 2
+          ),
+          ImmutableMap.of(
+              "birthday", 890179200L,
+              "name", "Employee3",
+              "id", 2766777393L,
+              "salary", 92418.21424435009,
+              "age", (short) 25,
+              "yoe", 9
+          ),
+          ImmutableMap.of(
+              "birthday", 1073001600L,
+              "name", "Employee4",
+              "id", 6320361986L,
+              "salary", 97907.76612488469,
+              "age", (short) 20,
+              "yoe", 3
+          ),
+          ImmutableMap.of(
+              "birthday", 823996800L,
+              "is_fulltime", true,
+              "bonus", 4982.215f,
+              "name", "Employee5",
+              "id", 7068152260L,
+              "salary", 79037.77202099308,
+              "last_vacation_time", 1706256972000L,
+              "age", (short) 27,
+              "yoe", 9
+          )
+      )
+  );
+
+  /**
+   * The expected rows from second checkpoint file {@code 
DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
+   */
+  private static final List<Map<String, Object>> SPLIT_1_EXPECTED_ROWS = new 
ArrayList<>(
+      ImmutableList.of(
+          ImmutableMap.of(
+              "birthday", 937526400L,
+              "is_fulltime", false,
+              "name", "Employee1",
+              "id", 4693651733L,
+              "salary", 83845.11357786917,
+              "age", (short) 24,
+              "yoe", 3
+          ),
+          ImmutableMap.of(
+              "birthday", 810777600L,
+              "is_fulltime", false,
+              "name", "Employee2",
+              "id", 7132772589L,
+              "salary", 90140.44051385639,
+              "age", (short) 28,
+              "yoe", 8
+          ),
+          ImmutableMap.of(
+              "birthday", 1104969600L,
+              "is_fulltime", true,
+              "bonus", 3699.0881f,
+              "name", "Employee3",
+              "id", 6627278510L,
+              "salary", 58857.27649436368,
+              "last_vacation_time", 1706458554000L,
+              "age", (short) 19,
+              "yoe", 4
+          ),
+          ImmutableMap.of(
+              "birthday", 763257600L,
+              "is_fulltime", true,
+              "bonus", 2334.6675f,
+              "name", "Employee4",
+              "id", 4786204912L,
+              "salary", 93646.81222022788,
+              "last_vacation_time", 1706390154000L,
+              "age", (short) 29,
+              "yoe", 5
+          ),
+          ImmutableMap.of(
+              "birthday", 1114646400L,
+              "name", "Employee5",
+              "id", 2773939764L,
+              "salary", 66300.05339373322,
+              "age", (short) 18,
+              "yoe", 3
+          ),
+          ImmutableMap.of(
+              "birthday", 913334400L,
+              "is_fulltime", false,
+              "name", "Employee6",
+              "id", 8333438088L,
+              "salary", 59219.5257906128,
+              "age", (short) 25,
+              "yoe", 4
+          ),
+          ImmutableMap.of(
+              "birthday", 893894400L,
+              "is_fulltime", false,
+              "name", "Employee7",
+              "id", 8397454007L,
+              "salary", 61909.733851830584,
+              "age", (short) 25,
+              "yoe", 8
+          ),
+          ImmutableMap.of(
+              "birthday", 1038873600L,
+              "is_fulltime", true,
+              "bonus", 3000.0154f,
+              "name", "Employee8",
+              "id", 8925359945L,
+              "salary", 76588.05471316943,
+              "last_vacation_time", 1706195754000L,
+              "age", (short) 21,
+              "yoe", 1
+          ),
+          ImmutableMap.of(
+              "birthday", 989798400L,
+              "is_fulltime", true,
+              "bonus", 4463.3833f,
+              "name", "Employee9",
+              "id", 8154788551L,
+              "salary", 59787.98539015684,
+              "last_vacation_time", 1706181354000L,
+              "age", (short) 22,
+              "yoe", 4
+          ),
+          ImmutableMap.of(
+              "birthday", 912297600L,
+              "is_fulltime", false,
+              "name", "Employee10",
+              "id", 5884382356L,
+              "salary", 51565.91965119349,
+              "age", (short) 25,
+              "yoe", 9
+          )
+      )
+  );
+
+  /**
+   * Mapping of checkpoint file identifier to the list of expected rows in 
that checkpoint.
+   */
+  public static final Map<Integer, List<Map<String, Object>>> 
SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
+      ImmutableMap.of(
+          0, SPLIT_0_EXPECTED_ROWS,
+          1, SPLIT_1_EXPECTED_ROWS
+      )
+  );
+
+  /**
+   * Complete set of expected rows across all checkpoint files for {@link 
#DELTA_TABLE_PATH}.
+   */
+  public static final List<Map<String, Object>> EXPECTED_ROWS = 
SPLIT_TO_EXPECTED_ROWS.values().stream()
+                                                                               
       .flatMap(List::stream)
+                                                                               
       .collect(Collectors.toList());
+
+  /**
+   * The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
+   */
+  public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
+      new TimestampSpec("birthday", "posix", null),
+      new DimensionsSpec(
+          ImmutableList.of(
+              new LongDimensionSchema("id"),
+              new LongDimensionSchema("birthday"),
+              new StringDimensionSchema("name"),
+              new LongDimensionSchema("age"),
+              new DoubleDimensionSchema("salary"),
+              new FloatDimensionSchema("bonus"),
+              new LongDimensionSchema("yoe"),
+              new StringDimensionSchema("is_fulltime"),
+              new LongDimensionSchema("last_vacation_time")
+          )
+      ),
+      ColumnsFilter.all()
+  );
+
+  /**
+   * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with 
an inclusion filter applied.
+   */
+  public static final InputRowSchema SCHEMA_1 = new InputRowSchema(
+      new TimestampSpec("birthday", "posix", null),
+      new DimensionsSpec(
+          ImmutableList.of(
+              new LongDimensionSchema("id"),
+              new LongDimensionSchema("birthday"),
+              new StringDimensionSchema("name"),
+              new LongDimensionSchema("age"),
+              new DoubleDimensionSchema("salary"),
+              new FloatDimensionSchema("bonus"),
+              new LongDimensionSchema("yoe"),
+              new StringDimensionSchema("is_fulltime"),
+              new LongDimensionSchema("last_vacation_time")
+          )
+      ),
+      ColumnsFilter.inclusionBased(ImmutableSet.of("id", "birthday", "name", 
"is_fulltime"))
+  );
+
+  /**
+   * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with 
an exclusion filter applied. A non-existent
+   * column is added to the exclusion filter - it should silently get thrown 
away.
+   */
+  public static final InputRowSchema SCHEMA_2 = new InputRowSchema(
+      new TimestampSpec("birthday", "posix", null),
+      new DimensionsSpec(
+          ImmutableList.of(
+              new LongDimensionSchema("id"),
+              new LongDimensionSchema("birthday"),
+              new StringDimensionSchema("name"),
+              new LongDimensionSchema("age"),
+              new DoubleDimensionSchema("salary"),
+              new FloatDimensionSchema("bonus"),
+              new LongDimensionSchema("yoe"),
+              new StringDimensionSchema("is_fulltime"),
+              new LongDimensionSchema("last_vacation_time")
+          )
+      ),
+      ColumnsFilter.exclusionBased(ImmutableSet.of("last_vacation_time", 
"bonus", "non_existent_column"))
+  );
+
+  /**
+   * A simple wrapper that builds the table scan for {@link #DELTA_TABLE_PATH} 
meant to be used in tests.
+   */
+  public static Scan getScan(final TableClient tableClient) throws 
TableNotFoundException
+  {
+    final Table table = Table.forPath(tableClient, DELTA_TABLE_PATH);
+    final Snapshot snapshot = table.getLatestSnapshot(tableClient);
+    final StructType readSchema = snapshot.getSchema(tableClient);
+    final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
+                                            .withReadSchema(tableClient, 
readSchema);
+    return scanBuilder.build();
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
new file mode 100644
index 00000000000..de78ed97aa3
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+
+public class DeltaTimeUtilsTest
+{
+  @Before
+  public void setUp()
+  {
+    System.setProperty("user.timezone", "UTC");
+  }
+
+  @Test
+  public void testTimestampValue()
+  {
+    Assert.assertEquals(
+        Instant.parse("2018-02-02T00:28:02.000Z"),
+        Instant.ofEpochMilli(
+            DeltaTimeUtils.getMillisFromTimestamp(
+                Instant.parse("2018-02-02T00:28:02.000Z").toEpochMilli() * 
1_000
+            )
+        )
+    );
+
+    Assert.assertEquals(
+        Instant.parse("2024-01-31T00:58:03.000Z"),
+        Instant.ofEpochMilli(
+            DeltaTimeUtils.getMillisFromTimestamp(
+                Instant.parse("2024-01-31T00:58:03.002Z").toEpochMilli() * 
1_000
+            )
+        )
+    );
+  }
+
+  @Test
+  public void testDateTimeValue()
+  {
+    Assert.assertEquals(
+        Instant.parse("2020-02-01T00:00:00.000Z"),
+        Instant.ofEpochSecond(
+            DeltaTimeUtils.getSecondsFromDate(
+                (int) 
Intervals.of("1970-01-01/2020-02-01").toDuration().getStandardDays()
+            )
+        )
+    );
+
+    Assert.assertEquals(
+        Instant.parse("2024-01-01T00:00:00.000Z"),
+        Instant.ofEpochSecond(
+            DeltaTimeUtils.getSecondsFromDate(
+                (int) 
Intervals.of("1970-01-01/2024-01-01T02:23:00").toDuration().getStandardDays()
+            )
+        )
+    );
+  }
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
new file mode 100644
index 00000000000..eb06f532a02
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RowSerdeTest
+{
+  @Test
+  public void testSerializeDeserializeRoundtrip() throws TableNotFoundException
+  {
+    final DefaultTableClient tableClient = DefaultTableClient.create(new 
Configuration());
+    final Scan scan = DeltaTestUtils.getScan(tableClient);
+    final Row scanState = scan.getScanState(tableClient);
+
+    final String rowJson = RowSerde.serializeRowToJson(scanState);
+    final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson);
+
+    Assert.assertEquals(scanState.getSchema(), row.getSchema());
+  }
+
+}
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md
new file mode 100644
index 00000000000..c524adec4b5
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md
@@ -0,0 +1,68 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+### Generate Delta Table for Unit Tests
+
+To test Delta Lake ingestion, use the Python script `create_delta_table.py` to 
generate a sample Delta table.
+Create a conda env `delta_test` with all the requirements specified in 
`requirements.txt` installed in the
+environment:
+```shell
+conda create --name delta_test --file requirements.txt
+```
+
+To activate the environment:
+
+```shell
+conda activate delta_test
+```
+
+From the conda environment, you can run the python script:
+
+```python
+python3 create_delta_table.py
+```
+
+By default, the script uses `append` mode to generate 10 random records and 
writes the
+Delta table to `resources/employee-delta-table`. You can override the defaults 
by supplying the command line arguments:
+
+```shell
+python3 create_delta_table.py -h
+
+usage: create_delta_table.py [-h] [--save_mode {append,overwrite}] 
[--save_path SAVE_PATH] [--num_records NUM_RECORDS]
+
+Script to write a Delta Lake table.
+
+optional arguments:
+  -h, --help            show this help message and exit
+  --save_mode {append,overwrite}
+                        Specify write mode (append/overwrite) (default: append)
+  --save_path SAVE_PATH
+                        Save path for Delta table (default: 
<DRUID_BASE_PATH>/druid/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table)
+  --num_records NUM_RECORDS
+                        Specify number of Delta records to write (default: 10)
+```
+
+The test data in `resources/employee-delta-table` was generated by:
+```shell
+python3 create_delta_table.py
+python3 create_delta_table.py --num_records=5 --save_mode=append
+```
+
+This creates a total of 15 Delta records across two transactional commits. The 
resulting Delta table is checked in
+to the repo. The expectated rows `DeltaTestUtils.java` are updated accordingly.
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
new file mode 100755
index 00000000000..ab9ec87fb00
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import argparse
+import delta
+import pyspark
+from pyspark.sql.types import StructType, StructField, ShortType, StringType, 
TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType, 
BooleanType
+from datetime import datetime, timedelta
+import random
+
+
+def config_spark_with_delta_lake():
+    builder = (
+        pyspark.sql.SparkSession.builder.appName("DeltaLakeApp")
+        .config("spark.sql.extensions", 
"io.delta.sql.DeltaSparkSessionExtension")
+        .config(
+            "spark.sql.catalog.spark_catalog",
+            "org.apache.spark.sql.delta.catalog.DeltaCatalog",
+        )
+    )
+    spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
+    spark.sparkContext.setLogLevel("ERROR")
+    return spark
+
+
+def create_dataset(num_records):
+    """
+    Generate a mock employee dataset with different datatypes for testing 
purposes.
+
+    Parameters:
+    - num_records (int): Number of records to generate.
+
+    Returns:
+    - Tuple: A tuple containing a list of records and the corresponding schema.
+      - List of Records: Each record is a tuple representing a row of data.
+      - StructType: The schema defining the structure of the records.
+
+    Example:
+    ```python
+    data, schema = create_dataset(10)
+    ```
+    """
+    schema = StructType([
+        StructField("id", LongType(), False),
+        StructField("birthday", DateType(), False),
+        StructField("name", StringType(), True),
+        StructField("age", ShortType(), True),
+        StructField("salary", DoubleType(), True),
+        StructField("bonus", FloatType(), True),
+        StructField("yoe", IntegerType(), True),
+        StructField("is_fulltime", BooleanType(), True),
+        StructField("last_vacation_time", TimestampType(), True)
+    ])
+
+    data = []
+    current_date = datetime.now()
+
+    for i in range(num_records):
+        birthday = current_date - timedelta(days=random.randint(365 * 18, 365 
* 30))
+        age = (current_date - birthday).days // 365
+        is_fulltime = random.choice([True, False, None])
+        record = (
+            random.randint(1, 10000000000),
+            birthday,
+            f"Employee{i+1}",
+            age,
+            random.uniform(50000, 100000),
+            random.uniform(1000, 5000) if is_fulltime else None,
+            random.randint(1, min(20, age - 15)),
+            is_fulltime,
+            datetime.now() - timedelta(hours=random.randint(1, 90)) if 
is_fulltime else None,
+        )
+        data.append(record)
+    return data, schema
+
+
+def main():
+    parser = argparse.ArgumentParser(description="Script to write a Delta Lake 
table.",
+                                     
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+    parser.add_argument('--save_mode', choices=('append', 'overwrite'), 
default="overwrite",
+                        help="Specify write mode (append/overwrite)")
+    parser.add_argument('--save_path', default=os.path.join(os.getcwd(), 
"employee-delta-table"),
+                        help="Save path for Delta table")
+    parser.add_argument('--num_records', type=int, default=10,
+                        help="Specify number of Delta records to write")
+
+    args = parser.parse_args()
+
+    save_mode = args.save_mode
+    save_path = args.save_path
+    num_records = args.num_records
+
+    spark = config_spark_with_delta_lake()
+
+    data, schema = create_dataset(num_records=num_records)
+    df = spark.createDataFrame(data, schema=schema)
+    df.write.format("delta").mode(save_mode).save(save_path)
+
+    df.show()
+
+    print(f"Generated Delta records to {save_path} in {save_mode} mode with 
{num_records} records.")
+
+
+if __name__ == "__main__":
+    main()
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..33d75bafdc5
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..3bdcd64c34e
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..f7e53b3258b
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..7de25e042fc
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..e6c6dc8bf9e
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..693057b9126
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..81f3070f023
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..3a8e19d5f78
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..16b0da7f9b2
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..2f8bfc23a92
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..b11e9d379d1
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..a0daef3f2b6
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..96e2c291bf2
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..ba21f2ab460
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..c55ee1b40bc
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..57a765a96ee
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
new file mode 100644
index 00000000000..75a1c6db2e3
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
new file mode 100644
index 00000000000..e026fad26ba
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
new file mode 100644
index 00000000000..4a07c503c43
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
@@ -0,0 +1,13 @@
+{"commitInfo":{"timestamp":1706498159640,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"10","numOutputRows":"10","numOutputBytes":"23710"},"engineInfo":"Apache-Spark/3.5.0
 Delta-Lake/3.0.0","txnId":"5557f0e2-6cdc-4249-bd19-368bec96751d"}}
+{"metaData":{"id":"c29741f1-4649-43a8-b2d9-9d46c42c58e1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullab
 [...]
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"add":{"path":"part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"maxValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"maxValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",\"age\":19,\"salary\":58857.27649436368,\"bonus\":3699.0881,\"yoe\":4,\"last_vacation_time\":\"2024-01-28T08:15:54.648-08:00\"},\"maxValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",
 [...]
+{"add":{"path":"part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",\"age\":29,\"salary\":93646.81222022788,\"bonus\":2334.6675,\"yoe\":5,\"last_vacation_time\":\"2024-01-27T13:15:54.648-08:00\"},\"maxValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",
 [...]
+{"add":{"path":"part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"maxValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"maxValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"nullCount\":{\"id\":0,\"bi
 [...]
+{"add":{"path":"part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"maxValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"nullCount\":{\"id\":0,
 [...]
+{"add":{"path":"part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",\"age\":21,\"salary\":76588.05471316943,\"bonus\":3000.0154,\"yoe\":1,\"last_vacation_time\":\"2024-01-25T07:15:54.648-08:00\"},\"maxValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",
 [...]
+{"add":{"path":"part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",\"age\":22,\"salary\":59787.98539015684,\"bonus\":4463.3833,\"yoe\":4,\"last_vacation_time\":\"2024-01-25T03:15:54.648-08:00\"},\"maxValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",
 [...]
+{"add":{"path":"part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet","partitionValues":{},"size":2324,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"maxValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"nullCount\":{\"id\":0,
 [...]
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
new file mode 100644
index 00000000000..188f2417e6a
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
@@ -0,0 +1,6 @@
+{"commitInfo":{"timestamp":1706498178162,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"12655"},"engineInfo":"Apache-Spark/3.5.0
 Delta-Lake/3.0.0","txnId":"38049998-65f7-4ed6-8d48-e51c5da502c6"}}
+{"add":{"path":"part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet","partitionValues":{},"size":2301,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"maxValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"nullCount\":{\"id\":0,\"bi
 [...]
+{"add":{"path":"part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"maxValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"maxValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"maxValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"nullCount\":{\"id\":0,\"
 [...]
+{"add":{"path":"part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\"age\":27,\"salary\":79037.77202099308,\"bonus\":4982.215,\"yoe\":9,\"last_vacation_time\":\"2024-01-26T00:16:12.196-08:00\"},\"maxValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\
 [...]
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
new file mode 100644
index 00000000000..d8882215890
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
new file mode 100644
index 00000000000..f007f82b3b6
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
new file mode 100644
index 00000000000..549b403d153
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
new file mode 100644
index 00000000000..006b8482ac8
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
new file mode 100644
index 00000000000..d3acfdf32fe
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
new file mode 100644
index 00000000000..d76352414b3
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
new file mode 100644
index 00000000000..19fc8efe4f4
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
new file mode 100644
index 00000000000..3490b566213
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
new file mode 100644
index 00000000000..bdf5e95f168
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
new file mode 100644
index 00000000000..3cbe3b65a87
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
new file mode 100644
index 00000000000..ec97497ff59
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
new file mode 100644
index 00000000000..6145827b330
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
new file mode 100644
index 00000000000..858a8a10b18
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
new file mode 100644
index 00000000000..fa13a5f9eaf
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
new file mode 100644
index 00000000000..13863ff9731
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
new file mode 100644
index 00000000000..1f314548386
Binary files /dev/null and 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
 differ
diff --git 
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
new file mode 100644
index 00000000000..8a846d26385
--- /dev/null
+++ 
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
@@ -0,0 +1,2 @@
+delta-spark==3.0.0
+pyspark==3.5.0
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d97e8708367..d190f405ed1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,7 +230,9 @@
         <module>extensions-contrib/opentelemetry-emitter</module>
         <module>extensions-contrib/kubernetes-overlord-extensions</module>
         <module>extensions-contrib/druid-iceberg-extensions</module>
+        <module>extensions-contrib/druid-deltalake-extensions</module>
         <module>extensions-contrib/spectator-histogram</module>
+
         <!-- distribution packaging -->
         <module>distribution</module>
         <!-- Revised integration tests -->
diff --git a/web-console/assets/delta.png b/web-console/assets/delta.png
new file mode 100644
index 00000000000..db535506c7b
Binary files /dev/null and b/web-console/assets/delta.png differ
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx 
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index 3594f70c992..a675205fdfc 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -98,6 +98,7 @@ export type IngestionComboType =
   | 'index_parallel:inline'
   | 'index_parallel:s3'
   | 'index_parallel:azure'
+  | 'index_parallel:delta'
   | 'index_parallel:google'
   | 'index_parallel:hdfs';
 
@@ -136,6 +137,7 @@ export function getIngestionComboType(
       switch (inputSource.type) {
         case 'local':
         case 'http':
+        case 'delta':
         case 'druid':
         case 'inline':
         case 's3':
@@ -170,6 +172,9 @@ export function getIngestionTitle(ingestionType: 
IngestionComboTypeWithExtra): s
     case 'index_parallel:azure':
       return 'Azure Data Lake';
 
+    case 'index_parallel:delta':
+      return 'Delta Lake';
+
     case 'index_parallel:google':
       return 'Google Cloud Storage';
 
@@ -462,7 +467,7 @@ export function getIoConfigFormFields(ingestionComboType: 
IngestionComboType): F
     name: 'inputSource.type',
     label: 'Source type',
     type: 'string',
-    suggestions: ['local', 'http', 'inline', 's3', 'azure', 'google', 'hdfs'],
+    suggestions: ['local', 'http', 'inline', 'delta', 's3', 'azure', 'google', 
'hdfs'],
     info: (
       <p>
         Druid connects to raw data through{' '}
@@ -895,6 +900,18 @@ export function getIoConfigFormFields(ingestionComboType: 
IngestionComboType): F
         inputSourceFilter,
       ];
 
+    case 'index_parallel:delta':
+      return [
+        inputSourceType,
+        {
+          name: 'inputSource.tablePath',
+          label: 'Delta table path',
+          type: 'string',
+          placeholder: '/path/to/deltaTable',
+          required: true,
+        },
+      ];
+
     case 'index_parallel:hdfs':
       return [
         inputSourceType,
@@ -1085,6 +1102,7 @@ export function getIoConfigTuningFormFields(
     case 'index_parallel:s3':
     case 'index_parallel:azure':
     case 'index_parallel:google':
+    case 'index_parallel:delta':
     case 'index_parallel:hdfs':
       return [
         {
diff --git a/web-console/src/druid-models/input-source/input-source.tsx 
b/web-console/src/druid-models/input-source/input-source.tsx
index 4e0647320e5..01b111b957f 100644
--- a/web-console/src/druid-models/input-source/input-source.tsx
+++ b/web-console/src/druid-models/input-source/input-source.tsx
@@ -55,6 +55,9 @@ export interface InputSource {
   // inline
   data?: string;
 
+  // delta
+  tablePath?: string;
+
   // hdfs
   paths?: string | string[];
 
@@ -111,6 +114,10 @@ export type InputSourceDesc =
       type: 'hdfs';
       paths?: string | string[];
     }
+  | {
+      type: 'delta';
+      tablePath?: string;
+    }
   | {
       type: 'sql';
       database: any;
@@ -158,6 +165,12 @@ export function issueWithInputSource(inputSource: 
InputSource | undefined): stri
       }
       return;
 
+    case 'delta':
+      if (!inputSource.tablePath) {
+        return 'must have tablePath';
+      }
+      return;
+
     case 'hdfs':
       if (!inputSource.paths) {
         return 'must have paths';
@@ -169,7 +182,18 @@ export function issueWithInputSource(inputSource: 
InputSource | undefined): stri
   }
 }
 
-const KNOWN_TYPES = ['inline', 'druid', 'http', 'local', 's3', 'azure', 
'google', 'hdfs', 'sql'];
+const KNOWN_TYPES = [
+  'inline',
+  'druid',
+  'http',
+  'local',
+  's3',
+  'azure',
+  'delta',
+  'google',
+  'hdfs',
+  'sql',
+];
 export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
   // inline
 
@@ -574,6 +598,16 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
     required: true,
   },
 
+  // delta lake
+  {
+    name: 'tablePath',
+    label: 'Delta table path',
+    type: 'string',
+    placeholder: '/path/to/deltaTable',
+    defined: typeIsKnown(KNOWN_TYPES, 'delta'),
+    required: true,
+  },
+
   // sql
   {
     name: 'database.type',
diff --git 
a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
 
b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
index f5956786afd..b0a81a1800a 100644
--- 
a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
+++ 
b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
@@ -202,6 +202,20 @@ exports[`LoadDataView matches snapshot batch 1`] = `
               Google Cloud Storage
             </p>
           </Blueprint4.Card>
+          <Blueprint4.Card
+            className="ingestion-card"
+            elevation={1}
+            interactive={true}
+            onClick={[Function]}
+          >
+            <img
+              alt="Ingestion tile for index_parallel:delta"
+              src="/some/base_url/assets/delta.png"
+            />
+            <p>
+              Delta Lake
+            </p>
+          </Blueprint4.Card>
           <Blueprint4.Card
             className="ingestion-card"
             elevation={1}
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx 
b/web-console/src/views/load-data-view/load-data-view.tsx
index db7e8e0f093..18fdf045a5b 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -859,6 +859,7 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
                 {this.renderIngestionCard('index_parallel:s3')}
                 {this.renderIngestionCard('index_parallel:azure')}
                 {this.renderIngestionCard('index_parallel:google')}
+                {this.renderIngestionCard('index_parallel:delta')}
                 {this.renderIngestionCard('index_parallel:hdfs')}
                 {this.renderIngestionCard('index_parallel:druid')}
                 {this.renderIngestionCard('index_parallel:http')}
@@ -921,6 +922,14 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
           </>
         );
 
+      case 'index_parallel:delta':
+        return (
+          <>
+            <p>Load data from Delta Lake.</p>
+            <p>Data must be stored in the Delta Lake format.</p>
+          </>
+        );
+
       case 'index_parallel:druid':
         return (
           <>
@@ -1009,6 +1018,7 @@ export class LoadDataView extends 
React.PureComponent<LoadDataViewProps, LoadDat
       case 'index_parallel:s3':
       case 'index_parallel:azure':
       case 'index_parallel:google':
+      case 'index_parallel:delta':
       case 'index_parallel:hdfs':
       case 'kafka':
       case 'kinesis':
diff --git 
a/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx 
b/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
index 2a7e2361d5e..c5dac4c12f9 100644
--- 
a/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
+++ 
b/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
@@ -69,6 +69,9 @@ export const InputSourceInfo = React.memo(function 
InputSourceInfo(props: InputS
     case 'google':
       return <p>Load text based, avro, orc, or parquet data from the Google 
Blobstore.</p>;
 
+    case 'delta':
+      return <p>Load data from Delta Lake.</p>;
+
     case 'hdfs':
       return <p>Load text based, avro, orc, or parquet data from HDFS.</p>;
 
diff --git a/website/.spelling b/website/.spelling
index 640f9887331..227e97160fb 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -63,6 +63,7 @@ DRUIDVERSION
 DataSketches
 DateTime
 DateType
+DeltaLakeInputSource
 dimensionsSpec
 DimensionSpec
 DimensionSpecs
@@ -79,6 +80,7 @@ downsamples
 downsampling
 Dropwizard
 dropwizard
+druid-deltalake-extensions
 DruidInputSource
 DruidSQL
 DynamicConfigProvider
@@ -161,6 +163,7 @@ Kerberos
 KeyStores
 Kinesis
 Kubernetes
+Lakehouse
 LDAPS
 LRU
 LZ4
@@ -525,6 +528,7 @@ SVG
 symlink
 syntaxes
 systemFields
+tablePath
 tiering
 timeseries
 Timeseries
@@ -571,6 +575,7 @@ varchar
 vectorizable
 vectorize
 vectorizeVirtualColumns
+versioned
 versioning
 virtualColumns
 w.r.t.
@@ -795,9 +800,11 @@ multi-server
 BasicDataSource
 LeaderLatch
 2.x
-28.x 
+28.x
+3.0.x
 3.5.x
 3.4.x
+3.5.x.
 AllowAll
 AuthenticationResult
 AuthorizationLoadingLookupTest


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to