This is an automated email from the ASF dual-hosted git repository.
abhishek pushed a commit to branch 29.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/29.0.0 by this push:
new 8438d0fa658 [Backport] Extension to read and ingest Delta Lake tables
(#15755) (#15812)
8438d0fa658 is described below
commit 8438d0fa65888e46ea109fea5311ec885493d63b
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Wed Jan 31 01:37:46 2024 -0800
[Backport] Extension to read and ingest Delta Lake tables (#15755) (#15812)
First commit: clean backport of Extension to read and ingest Delta Lake
tables #15755
Second commit: change version from 30.0.0 to 29.0.0
---
distribution/pom.xml | 4 +
docs/development/extensions-contrib/delta-lake.md | 44 +++
docs/ingestion/input-sources.md | 39 ++-
.../druid-deltalake-extensions/pom.xml | 156 ++++++++++
.../druid/delta/common/DeltaLakeDruidModule.java | 70 +++++
.../apache/druid/delta/input/DeltaInputRow.java | 208 +++++++++++++
.../apache/druid/delta/input/DeltaInputSource.java | 261 +++++++++++++++++
.../druid/delta/input/DeltaInputSourceReader.java | 138 +++++++++
.../org/apache/druid/delta/input/DeltaSplit.java | 71 +++++
.../apache/druid/delta/input/DeltaTimeUtils.java | 59 ++++
.../org/apache/druid/delta/input/RowSerde.java | 158 ++++++++++
.../org.apache.druid.initialization.DruidModule | 16 +
.../druid/delta/input/DeltaInputRowTest.java | 80 +++++
.../druid/delta/input/DeltaInputSourceTest.java | 256 ++++++++++++++++
.../apache/druid/delta/input/DeltaTestUtils.java | 322 +++++++++++++++++++++
.../druid/delta/input/DeltaTimeUtilsTest.java | 80 +++++
.../org/apache/druid/delta/input/RowSerdeTest.java | 45 +++
.../src/test/resources/README.md | 68 +++++
.../src/test/resources/create_delta_table.py | 122 ++++++++
...-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc | Bin 0 -> 16 bytes
...-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4189-8927-97fe1720df8d-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-43b7-87db-448c67a315df-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-444c-8984-6baecf6987ee-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
...-4f80-bd63-e369c6335699-c000.snappy.parquet.crc | Bin 0 -> 28 bytes
.../_delta_log/.00000000000000000000.json.crc | Bin 0 -> 72 bytes
.../_delta_log/.00000000000000000001.json.crc | Bin 0 -> 36 bytes
.../_delta_log/00000000000000000000.json | 13 +
.../_delta_log/00000000000000000001.json | 6 +
...0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet | Bin 0 -> 2316 bytes
...c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet | Bin 0 -> 979 bytes
...db98-40f2-9185-45237f51b9bf-c000.snappy.parquet | Bin 0 -> 2316 bytes
...a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet | Bin 0 -> 2301 bytes
...0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet | Bin 0 -> 2455 bytes
...8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet | Bin 0 -> 2317 bytes
...bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet | Bin 0 -> 2454 bytes
...54cb-43b7-87db-448c67a315df-c000.snappy.parquet | Bin 0 -> 2302 bytes
...c414-444c-8984-6baecf6987ee-c000.snappy.parquet | Bin 0 -> 2316 bytes
...5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet | Bin 0 -> 2302 bytes
...faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet | Bin 0 -> 2317 bytes
...e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet | Bin 0 -> 2455 bytes
...b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet | Bin 0 -> 2302 bytes
...2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet | Bin 0 -> 2455 bytes
...e177-4c02-b256-bc890fadce7e-c000.snappy.parquet | Bin 0 -> 2454 bytes
...aaec-4f80-bd63-e369c6335699-c000.snappy.parquet | Bin 0 -> 2324 bytes
.../src/test/resources/requirements.txt | 2 +
pom.xml | 2 +
web-console/assets/delta.png | Bin 0 -> 6527 bytes
.../druid-models/ingestion-spec/ingestion-spec.tsx | 20 +-
.../src/druid-models/input-source/input-source.tsx | 36 ++-
.../__snapshots__/load-data-view.spec.tsx.snap | 14 +
.../src/views/load-data-view/load-data-view.tsx | 10 +
.../input-source-step/input-source-info.tsx | 3 +
website/.spelling | 9 +-
64 files changed, 2303 insertions(+), 9 deletions(-)
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 4376e28056e..0c14b913e3f 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -257,6 +257,8 @@
<argument>org.apache.druid.extensions:druid-kubernetes-extensions</argument>
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-catalog</argument>
+ <argument>-c</argument>
+
<argument>org.apache.druid.extensions:druid-deltalake-extensions</argument>
<argument>${druid.distribution.pulldeps.opts}</argument>
</arguments>
</configuration>
@@ -453,6 +455,8 @@
<argument>-c</argument>
<argument>org.apache.druid.extensions:druid-iceberg-extensions</argument>
<argument>-c</argument>
+
<argument>org.apache.druid.extensions:druid-deltalake-extensions</argument>
+ <argument>-c</argument>
<argument>org.apache.druid.extensions.contrib:druid-spectator-histogram</argument>
</arguments>
</configuration>
diff --git a/docs/development/extensions-contrib/delta-lake.md
b/docs/development/extensions-contrib/delta-lake.md
new file mode 100644
index 00000000000..8b1de484971
--- /dev/null
+++ b/docs/development/extensions-contrib/delta-lake.md
@@ -0,0 +1,44 @@
+---
+id: delta-lake
+title: "Delta Lake extension"
+---
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+## Delta Lake extension
+
+
+Delta Lake is an open source storage framework that enables building a
+Lakehouse architecture with various compute engines.
[DeltaLakeInputSource](../../ingestion/input-sources.md#delta-lake-input-source)
lets
+you ingest data stored in a Delta Lake table into Apache Druid. To use the
Delta Lake extension, add the `druid-deltalake-extensions` to the list of
loaded extensions.
+See [Loading extensions](../../configuration/extensions.md#loading-extensions)
for more information.
+
+The Delta input source reads the configured Delta Lake table and extracts all
the underlying delta files in the table's latest snapshot.
+These Delta Lake files are versioned Parquet files.
+
+## Version support
+
+The Delta Lake extension uses the Delta Kernel introduced in Delta Lake 3.0.0,
which is compatible with Apache Spark 3.5.x.
+Older versions are unsupported, so consider upgrading to Delta Lake 3.0.x or
higher to use this extension.
+
+## Known limitations
+
+- This extension relies on the Delta Kernel API and can only read from the
latest Delta table snapshot.
+- Column filtering isn't supported. The extension reads all columns in the
configured table.
\ No newline at end of file
diff --git a/docs/ingestion/input-sources.md b/docs/ingestion/input-sources.md
index 6e1be89c53d..d4693e7925a 100644
--- a/docs/ingestion/input-sources.md
+++ b/docs/ingestion/input-sources.md
@@ -715,6 +715,13 @@ rolled-up datasource `wikipedia_rollup` by grouping on
hour, "countryName", and
to `true` to enable a compatibility mode where the timestampSpec is ignored.
:::
+The [secondary partitioning method](native-batch.md#partitionsspec) determines
the requisite number of concurrent worker tasks that run in parallel to
complete ingestion with the Combining input source.
+Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the
secondary partitioning method:
+- `range` or `single_dim` partitioning: greater than or equal to 1
+- `hashed` or `dynamic` partitioning: greater than or equal to 2
+
+For more information on the `maxNumConcurrentSubTasks` field, see
[Implementation considerations](native-batch.md#implementation-considerations).
+
## SQL input source
The SQL input source is used to read data directly from RDBMS.
@@ -817,7 +824,7 @@ The following is an example of a Combining input source
spec:
## Iceberg input source
:::info
- To use the Iceberg input source, add the `druid-iceberg-extensions` extension.
+To use the Iceberg input source, load the extension
[`druid-iceberg-extensions`](../development/extensions-contrib/iceberg.md).
:::
You use the Iceberg input source to read data stored in the Iceberg table
format. For a given table, the input source scans up to the latest Iceberg
snapshot from the configured Hive catalog. Druid ingests the underlying live
data files using the existing input source formats.
@@ -1006,11 +1013,31 @@ This input source provides the following filters:
`and`, `equals`, `interval`, a
|type|Set this value to `not`.|yes|
|filter|The iceberg filter on which logical NOT is applied|yes|
+## Delta Lake input source
+:::info
+To use the Delta Lake input source, load the extension
[`druid-deltalake-extensions`](../development/extensions-contrib/delta-lake.md).
+:::
-The [secondary partitioning method](native-batch.md#partitionsspec) determines
the requisite number of concurrent worker tasks that run in parallel to
complete ingestion with the Combining input source.
-Set this value in `maxNumConcurrentSubTasks` in `tuningConfig` based on the
secondary partitioning method:
-- `range` or `single_dim` partitioning: greater than or equal to 1
-- `hashed` or `dynamic` partitioning: greater than or equal to 2
+You can use the Delta input source to read data stored in a Delta Lake table.
For a given table, the input source scans
+the latest snapshot from the configured table. Druid ingests the underlying
delta files from the table.
+
+The following is a sample spec:
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "delta",
+ "tablePath": "/delta-table/directory"
+ },
+ }
+}
+```
+
+| Property|Description|Required|
+|---------|-----------|--------|
+| type|Set this value to `delta`.|yes|
+| tablePath|The location of the Delta table.|yes|
-For more information on the `maxNumConcurrentSubTasks` field, see
[Implementation considerations](native-batch.md#implementation-considerations).
diff --git a/extensions-contrib/druid-deltalake-extensions/pom.xml
b/extensions-contrib/druid-deltalake-extensions/pom.xml
new file mode 100644
index 00000000000..14020aa5c8a
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/pom.xml
@@ -0,0 +1,156 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <groupId>org.apache.druid.extensions</groupId>
+ <artifactId>druid-deltalake-extensions</artifactId>
+ <name>druid-deltalake-extensions</name>
+ <description>Delta Lake connector for Druid</description>
+
+ <parent>
+ <artifactId>druid</artifactId>
+ <groupId>org.apache.druid</groupId>
+ <version>29.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <properties>
+ <delta-kernel.version>3.0.0</delta-kernel.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-kernel-api</artifactId>
+ <version>${delta-kernel.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>io.delta</groupId>
+ <artifactId>delta-kernel-defaults</artifactId>
+ <version>${delta-kernel.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client-api</artifactId>
+ <version>${hadoop.compile.version}</version>
+ <scope>compile</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.google.inject</groupId>
+ <artifactId>guice</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-core</artifactId>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.12.7.1</version>
+ </dependency>
+ <dependency>
+ <groupId>it.unimi.dsi</groupId>
+ <artifactId>fastutil-core</artifactId>
+ <version>8.5.4</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-aws</artifactId>
+ <version>${hadoop.compile.version}</version>
+ <scope>runtime</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.amazonaws</groupId>
+ <artifactId>aws-java-sdk-bundle</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.druid</groupId>
+ <artifactId>druid-processing</artifactId>
+ <version>${project.parent.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.owasp</groupId>
+ <artifactId>dependency-check-maven</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
new file mode 100644
index 00000000000..70c2b6ff655
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/common/DeltaLakeDruidModule.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.common;
+
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.google.inject.Binder;
+import org.apache.druid.delta.input.DeltaInputSource;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.initialization.DruidModule;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+
+import java.util.Collections;
+import java.util.List;
+
+public class DeltaLakeDruidModule implements DruidModule
+{
+ @Override
+ public List<? extends Module> getJacksonModules()
+ {
+ return Collections.singletonList(
+ new SimpleModule("DeltaLakeDruidModule")
+ .registerSubtypes(
+ new NamedType(DeltaInputSource.class,
DeltaInputSource.TYPE_KEY)
+ )
+ );
+ }
+
+ @Override
+ public void configure(Binder binder)
+ {
+ final Configuration conf = new Configuration();
+ conf.setClassLoader(getClass().getClassLoader());
+
+ ClassLoader currCtxCl = Thread.currentThread().getContextClassLoader();
+ try {
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ FileSystem.get(conf);
+ }
+ catch (Exception ex) {
+ throw DruidException.forPersona(DruidException.Persona.DEVELOPER)
+ .ofCategory(DruidException.Category.UNCATEGORIZED)
+ .build(ex, "Problem during fileSystem class level
initialization");
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxCl);
+ }
+ }
+
+}
+
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
new file mode 100644
index 00000000000..acf909452a0
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputRow.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.types.BinaryType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DecimalType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import it.unimi.dsi.fastutil.objects.Object2IntMap;
+import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.Row;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.error.InvalidInput;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Encodes the row and schema information from the Delta Lake.
+ */
+public class DeltaInputRow implements InputRow
+{
+ private final io.delta.kernel.data.Row row;
+ private final StructType schema;
+ private final Object2IntMap<String> fieldNameToOrdinal = new
Object2IntOpenHashMap<>();
+ private final InputRow delegateRow;
+
+ public DeltaInputRow(io.delta.kernel.data.Row row, InputRowSchema
inputRowSchema)
+ {
+ this.row = row;
+ this.schema = row.getSchema();
+ List<String> fieldNames = this.schema.fieldNames();
+ for (int i = 0; i < fieldNames.size(); ++i) {
+ fieldNameToOrdinal.put(fieldNames.get(i), i);
+ }
+ fieldNameToOrdinal.defaultReturnValue(-1);
+
+ Map<String, Object> theMap = new HashMap<>();
+ for (String fieldName : fieldNames) {
+ theMap.put(fieldName, _getRaw(fieldName));
+ }
+ delegateRow = MapInputRowParser.parse(inputRowSchema, theMap);
+ }
+
+ @Override
+ public List<String> getDimensions()
+ {
+ return delegateRow.getDimensions();
+ }
+
+ @Override
+ public long getTimestampFromEpoch()
+ {
+ return delegateRow.getTimestampFromEpoch();
+ }
+
+ @Override
+ public DateTime getTimestamp()
+ {
+ return delegateRow.getTimestamp();
+ }
+
+ @Override
+ public List<String> getDimension(String dimension)
+ {
+ return delegateRow.getDimension(dimension);
+ }
+
+ @Nullable
+ @Override
+ public Object getRaw(String dimension)
+ {
+ return delegateRow.getRaw(dimension);
+ }
+
+ @Nullable
+ @Override
+ public Number getMetric(String metric)
+ {
+ return delegateRow.getMetric(metric);
+ }
+
+ @Override
+ public int compareTo(Row o)
+ {
+ return this.getTimestamp().compareTo(o.getTimestamp());
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ return o instanceof DeltaInputRow && compareTo((DeltaInputRow) o) == 0;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(row, schema, fieldNameToOrdinal, delegateRow);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DeltaInputRow{" +
+ "row=" + row +
+ ", schema=" + schema +
+ ", fieldNameToOrdinal=" + fieldNameToOrdinal +
+ ", delegateRow=" + delegateRow +
+ '}';
+ }
+
+ public Map<String, Object> getRawRowAsMap()
+ {
+ return RowSerde.convertRowToJsonObject(row);
+ }
+
+ @Nullable
+ private Object _getRaw(String dimension)
+ {
+ StructField field = schema.get(dimension);
+ if (field == null || field.isMetadataColumn()) {
+ return null;
+ }
+
+ int ordinal = fieldNameToOrdinal.getInt(dimension);
+ if (ordinal < 0) {
+ return null;
+ }
+ return getValue(field.getDataType(), row, ordinal);
+ }
+
+ @Nullable
+ private static Object getValue(DataType dataType, io.delta.kernel.data.Row
dataRow, int columnOrdinal)
+ {
+ if (dataRow.isNullAt(columnOrdinal)) {
+ return null;
+ } else if (dataType instanceof BooleanType) {
+ return dataRow.getBoolean(columnOrdinal);
+ } else if (dataType instanceof ByteType) {
+ return dataRow.getByte(columnOrdinal);
+ } else if (dataType instanceof ShortType) {
+ return dataRow.getShort(columnOrdinal);
+ } else if (dataType instanceof IntegerType) {
+ return dataRow.getInt(columnOrdinal);
+ } else if (dataType instanceof DateType) {
+ return DeltaTimeUtils.getSecondsFromDate(dataRow.getInt(columnOrdinal));
+ } else if (dataType instanceof LongType) {
+ return dataRow.getLong(columnOrdinal);
+ } else if (dataType instanceof TimestampType) {
+ return
DeltaTimeUtils.getMillisFromTimestamp(dataRow.getLong(columnOrdinal));
+ } else if (dataType instanceof FloatType) {
+ return dataRow.getFloat(columnOrdinal);
+ } else if (dataType instanceof DoubleType) {
+ return dataRow.getDouble(columnOrdinal);
+ } else if (dataType instanceof StringType) {
+ return dataRow.getString(columnOrdinal);
+ } else if (dataType instanceof BinaryType) {
+ final byte[] arr = dataRow.getBinary(columnOrdinal);
+ final char[] charArray = new char[arr.length];
+ for (int i = 0; i < arr.length; i++) {
+ charArray[i] = (char) (arr[i]);
+ }
+ return String.valueOf(charArray);
+ } else if (dataType instanceof DecimalType) {
+ return dataRow.getDecimal(columnOrdinal).longValue();
+ } else {
+ throw InvalidInput.exception(
+ "Unsupported data type[%s] for fieldName[%s].",
+ dataType,
+ dataRow.getSchema().fieldNames().get(columnOrdinal)
+ );
+ }
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
new file mode 100644
index 00000000000..936e2dd70d6
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSource.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.Iterators;
+import com.google.common.primitives.Ints;
+import io.delta.kernel.Scan;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.utils.Streams;
+import org.apache.hadoop.conf.Configuration;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Input source to ingest data from a Delta Lake.
+ * This input source reads the latest snapshot from a Delta table specified by
{@code tablePath} parameter.
+ * We leverage the Delta Kernel APIs to interact with a Delta table. The
Kernel API abstracts away the
+ * complexities of the Delta protocol itself.
+ * Note: currently, the Kernel table API only supports reading from the latest
snapshot.
+ */
+public class DeltaInputSource implements SplittableInputSource<DeltaSplit>
+{
+ public static final String TYPE_KEY = "delta";
+
+ @JsonProperty
+ private final String tablePath;
+
+ @JsonProperty
+ @Nullable
+ private final DeltaSplit deltaSplit;
+
+ @JsonCreator
+ public DeltaInputSource(
+ @JsonProperty("tablePath") String tablePath,
+ @JsonProperty("deltaSplit") @Nullable DeltaSplit deltaSplit
+ )
+ {
+ if (tablePath == null) {
+ throw InvalidInput.exception("tablePath cannot be null.");
+ }
+ this.tablePath = tablePath;
+ this.deltaSplit = deltaSplit;
+ }
+
+ @Override
+ public boolean needsFormat()
+ {
+ // Only support Parquet
+ return false;
+ }
+
+ /**
+ * Instantiates a {@link DeltaInputSourceReader} to read the Delta table
rows. If a {@link DeltaSplit} is supplied,
+ * the Delta files and schema are obtained from it to instantiate the
reader. Otherwise, a Delta table client is
+ * instantiated with the supplied configuration to read the table.
+ *
+ * @param inputRowSchema schema for {@link
org.apache.druid.data.input.InputRow}
+ * @param inputFormat unused parameter. The input format is always
parquet
+ * @param temporaryDirectory unused parameter
+ */
+ @Override
+ public InputSourceReader reader(
+ InputRowSchema inputRowSchema,
+ @Nullable InputFormat inputFormat,
+ File temporaryDirectory
+ )
+ {
+ final TableClient tableClient = createTableClient();
+ try {
+ final Row scanState;
+ final List<Row> scanRowList;
+
+ if (deltaSplit != null) {
+ scanState = deserialize(tableClient, deltaSplit.getStateRow());
+ scanRowList = deltaSplit.getFiles()
+ .stream()
+ .map(row -> deserialize(tableClient, row))
+ .collect(Collectors.toList());
+ } else {
+ final Table table = Table.forPath(tableClient, tablePath);
+ final Snapshot latestSnapshot = table.getLatestSnapshot(tableClient);
+ final StructType prunedSchema = pruneSchema(
+ latestSnapshot.getSchema(tableClient),
+ inputRowSchema.getColumnsFilter()
+ );
+ final Scan scan =
latestSnapshot.getScanBuilder(tableClient).withReadSchema(tableClient,
prunedSchema).build();
+ final CloseableIterator<FilteredColumnarBatch> scanFiles =
scan.getScanFiles(tableClient);
+
+ scanState = scan.getScanState(tableClient);
+ scanRowList = new ArrayList<>();
+
+ while (scanFiles.hasNext()) {
+ final FilteredColumnarBatch scanFileBatch = scanFiles.next();
+ final CloseableIterator<Row> scanFileRows = scanFileBatch.getRows();
+ scanFileRows.forEachRemaining(scanRowList::add);
+ }
+ }
+ return new DeltaInputSourceReader(
+ Scan.readData(
+ tableClient,
+ scanState,
+ Utils.toCloseableIterator(scanRowList.iterator()),
+ Optional.empty()
+ ),
+ inputRowSchema
+ );
+ }
+ catch (TableNotFoundException e) {
+ throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+ }
+ catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public Stream<InputSplit<DeltaSplit>> createSplits(InputFormat inputFormat,
@Nullable SplitHintSpec splitHintSpec)
+ {
+ if (deltaSplit != null) {
+ // can't split a split
+ return Stream.of(new InputSplit<>(deltaSplit));
+ }
+
+ final TableClient tableClient = createTableClient();
+ final Snapshot latestSnapshot;
+ try {
+ final Table table = Table.forPath(tableClient, tablePath);
+ latestSnapshot = table.getLatestSnapshot(tableClient);
+ }
+ catch (TableNotFoundException e) {
+ throw InvalidInput.exception(e, "tablePath[%s] not found.", tablePath);
+ }
+ final Scan scan = latestSnapshot.getScanBuilder(tableClient).build();
+ // scan files iterator for the current snapshot
+ final CloseableIterator<FilteredColumnarBatch> scanFilesIterator =
scan.getScanFiles(tableClient);
+
+ final Row scanState = scan.getScanState(tableClient);
+ final String scanStateStr = RowSerde.serializeRowToJson(scanState);
+
+ Iterator<DeltaSplit> deltaSplitIterator = Iterators.transform(
+ scanFilesIterator,
+ scanFile -> {
+ final CloseableIterator<Row> rows = scanFile.getRows();
+ final List<String> fileRows = new ArrayList<>();
+ while (rows.hasNext()) {
+ fileRows.add(RowSerde.serializeRowToJson(rows.next()));
+ }
+ return new DeltaSplit(scanStateStr, fileRows);
+ }
+ );
+
+ return
Streams.sequentialStreamFrom(deltaSplitIterator).map(InputSplit::new);
+ }
+
+ @Override
+ public int estimateNumSplits(InputFormat inputFormat, @Nullable
SplitHintSpec splitHintSpec)
+ {
+ return Ints.checkedCast(createSplits(inputFormat, splitHintSpec).count());
+ }
+
+ @Override
+ public InputSource withSplit(InputSplit<DeltaSplit> split)
+ {
+ return new DeltaInputSource(
+ tablePath,
+ split.get()
+ );
+ }
+
+ private Row deserialize(TableClient tableClient, String row)
+ {
+ return RowSerde.deserializeRowFromJson(tableClient, row);
+ }
+
+ /**
+ * Utility method to return a pruned schema that contains the given {@code
columns} from
+ * {@code baseSchema} applied by {@code columnsFilter}. This will serve as
an optimization
+ * for table scans if we're interested in reading only a subset of columns
from the Delta Lake table.
+ */
+ private StructType pruneSchema(final StructType baseSchema, final
ColumnsFilter columnsFilter)
+ {
+ final List<String> columnNames = baseSchema.fieldNames();
+ final List<String> fiteredColumnNames = columnNames
+ .stream()
+ .filter(columnsFilter::apply)
+ .collect(Collectors.toList());
+
+ if (fiteredColumnNames.equals(columnNames)) {
+ return baseSchema;
+ }
+ final List<StructField> selectedFields = fiteredColumnNames
+ .stream()
+ .map(baseSchema::get)
+ .collect(Collectors.toList());
+ return new StructType(selectedFields);
+ }
+
+ /**
+ * @return a table client where the client is initialized with {@link
Configuration} class that uses the class's
+ * class loader instead of the context classloader. The latter by default
doesn't know about the extension classes,
+ * so the table client cannot load runtime classes resulting in {@link
ClassNotFoundException}.
+ */
+ private TableClient createTableClient()
+ {
+ final ClassLoader currCtxClassloader =
Thread.currentThread().getContextClassLoader();
+ try {
+
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+ final Configuration conf = new Configuration();
+ return DefaultTableClient.create(conf);
+ }
+ finally {
+ Thread.currentThread().setContextClassLoader(currCtxClassloader);
+ }
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
new file mode 100644
index 00000000000..d0fc4780d00
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaInputSourceReader.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputStats;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import java.io.IOException;
+import java.util.NoSuchElementException;
+
+/**
+ * A reader for the Delta Lake input source. It initializes an iterator {@link
DeltaInputSourceIterator}
+ * for a subset of Delta records given by {@link FilteredColumnarBatch} and
schema {@link InputRowSchema}.
+ *
+ */
+public class DeltaInputSourceReader implements InputSourceReader
+{
+ private final io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>
filteredColumnarBatchCloseableIterator;
+ private final InputRowSchema inputRowSchema;
+
+ public DeltaInputSourceReader(
+ io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>
filteredColumnarBatchCloseableIterator,
+ InputRowSchema inputRowSchema
+ )
+ {
+ this.filteredColumnarBatchCloseableIterator =
filteredColumnarBatchCloseableIterator;
+ this.inputRowSchema = inputRowSchema;
+ }
+
+ @Override
+ public CloseableIterator<InputRow> read()
+ {
+ return new
DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator,
inputRowSchema);
+ }
+
+ @Override
+ public CloseableIterator<InputRow> read(InputStats inputStats)
+ {
+ return new
DeltaInputSourceIterator(filteredColumnarBatchCloseableIterator,
inputRowSchema);
+ }
+
+ @Override
+ public CloseableIterator<InputRowListPlusRawValues> sample()
+ {
+
+ CloseableIterator<InputRow> inner = read();
+ return new CloseableIterator<InputRowListPlusRawValues>()
+ {
+ @Override
+ public void close() throws IOException
+ {
+ inner.close();
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ return inner.hasNext();
+ }
+
+ @Override
+ public InputRowListPlusRawValues next()
+ {
+ DeltaInputRow deltaInputRow = (DeltaInputRow) inner.next();
+ return InputRowListPlusRawValues.of(deltaInputRow,
deltaInputRow.getRawRowAsMap());
+ }
+ };
+ }
+
+ private static class DeltaInputSourceIterator implements
CloseableIterator<InputRow>
+ {
+ private final
io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>
filteredColumnarBatchCloseableIterator;
+
+ private io.delta.kernel.utils.CloseableIterator<Row> currentBatch = null;
+ private final InputRowSchema inputRowSchema;
+
+ public DeltaInputSourceIterator(
+ io.delta.kernel.utils.CloseableIterator<FilteredColumnarBatch>
filteredColumnarBatchCloseableIterator,
+ InputRowSchema inputRowSchema
+ )
+ {
+ this.filteredColumnarBatchCloseableIterator =
filteredColumnarBatchCloseableIterator;
+ this.inputRowSchema = inputRowSchema;
+ }
+
+ @Override
+ public boolean hasNext()
+ {
+ while (currentBatch == null || !currentBatch.hasNext()) {
+ if (!filteredColumnarBatchCloseableIterator.hasNext()) {
+ return false; // No more batches or records to read!
+ }
+ currentBatch = filteredColumnarBatchCloseableIterator.next().getRows();
+ }
+ return true;
+ }
+
+ @Override
+ public InputRow next()
+ {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ Row dataRow = currentBatch.next();
+ return new DeltaInputRow(dataRow, inputRowSchema);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ filteredColumnarBatchCloseableIterator.close();
+ }
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
new file mode 100644
index 00000000000..7ab52cef089
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaSplit.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.List;
+
+/**
+ * An input split of a Delta table containing the following information:
+ * <li>
+ * {@code stateRow} represents the canonical json representation of the latest
snapshot of the Delta table.
+ * </li>
+ * <li>
+ * {@code files} represents the list of files from the latest snapshot.
+ * </li>
+ */
+public class DeltaSplit
+{
+ private final String stateRow;
+ private final List<String> files;
+
+ @JsonCreator
+ public DeltaSplit(
+ @JsonProperty("state") final String stateRow,
+ @JsonProperty("files") final List<String> files
+ )
+ {
+ this.stateRow = stateRow;
+ this.files = files;
+ }
+
+ @JsonProperty("state")
+ public String getStateRow()
+ {
+ return stateRow;
+ }
+
+ @JsonProperty("files")
+ public List<String> getFiles()
+ {
+ return files;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DeltaSplit{" +
+ "stateRow=" + stateRow +
+ ", files=" + files +
+ "}";
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
new file mode 100644
index 00000000000..f4742894402
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/DeltaTimeUtils.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+
+public class DeltaTimeUtils
+{
+ private static final ZoneId ZONE_ID = ZoneId.systemDefault();
+
+ /**
+ * {@link io.delta.kernel.types.TimestampType} data in Delta Lake tables is
stored internally as the number of
+ * microseconds since epoch.
+ *
+ * @param microSecsSinceEpochUTC microseconds since epoch
+ * @return Datetime millis correpsonding to {@code microSecsSinceEpochUTC}
+ */
+ public static long getMillisFromTimestamp(final long microSecsSinceEpochUTC)
+ {
+ final LocalDateTime dateTime = LocalDateTime.ofEpochSecond(
+ microSecsSinceEpochUTC / 1_000_000 /* epochSecond */,
+ (int) (1000 * microSecsSinceEpochUTC % 1_000_000) /* nanoOfSecond */,
+ ZoneOffset.UTC
+ );
+ return dateTime.atZone(ZONE_ID).toInstant().toEpochMilli();
+ }
+
+ /**
+ * {@link io.delta.kernel.types.DateType} data in Delta Lake tables is
stored internally as the number of
+ * days since epoch.
+ *
+ * @param daysSinceEpochUTC number of days since epoch
+ * @return number of seconds corresponding to {@code daysSinceEpochUTC}.
+ */
+ public static long getSecondsFromDate(final int daysSinceEpochUTC)
+ {
+ return
LocalDate.ofEpochDay(daysSinceEpochUTC).atStartOfDay(ZONE_ID).toEpochSecond();
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
new file mode 100644
index 00000000000..f10ac0574e9
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/java/org/apache/druid/delta/input/RowSerde.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.internal.data.DefaultJsonRow;
+import io.delta.kernel.internal.types.TableSchemaSerDe;
+import io.delta.kernel.internal.util.VectorUtils;
+import io.delta.kernel.types.ArrayType;
+import io.delta.kernel.types.BooleanType;
+import io.delta.kernel.types.ByteType;
+import io.delta.kernel.types.DataType;
+import io.delta.kernel.types.DateType;
+import io.delta.kernel.types.DoubleType;
+import io.delta.kernel.types.FloatType;
+import io.delta.kernel.types.IntegerType;
+import io.delta.kernel.types.LongType;
+import io.delta.kernel.types.MapType;
+import io.delta.kernel.types.ShortType;
+import io.delta.kernel.types.StringType;
+import io.delta.kernel.types.StructField;
+import io.delta.kernel.types.StructType;
+import io.delta.kernel.types.TimestampType;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.jackson.DefaultObjectMapper;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Utility class to serialize and deserialize {@link Row} object.
+ * Code borrowed from <a
href="https://github.com/delta-io/delta/blob/master/kernel/examples/table-reader/src/main/java/io/delta/kernel/examples/utils/RowSerDe.java">
+ * RowSerde.java</a>. The only differences between the two classes are the
code style and exception handling in
+ * {@link #convertRowToJsonObject}, where we use {@link
org.apache.druid.error.DruidException} instead of
+ * {@link UnsupportedOperationException}.
+ *
+ */
+public class RowSerde
+{
+ private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+ private RowSerde()
+ {
+ }
+
+ /**
+ * Utility method to serialize a {@link Row} as a JSON string
+ */
+ public static String serializeRowToJson(Row row)
+ {
+ Map<String, Object> rowObject = convertRowToJsonObject(row);
+ try {
+ Map<String, Object> rowWithSchema = new HashMap<>();
+ rowWithSchema.put("schema", TableSchemaSerDe.toJson(row.getSchema()));
+ rowWithSchema.put("row", rowObject);
+ return OBJECT_MAPPER.writeValueAsString(rowWithSchema);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Utility method to deserialize a {@link Row} object from the JSON form.
+ */
+ public static Row deserializeRowFromJson(TableClient tableClient, String
jsonRowWithSchema)
+ {
+ try {
+ JsonNode jsonNode = OBJECT_MAPPER.readTree(jsonRowWithSchema);
+ JsonNode schemaNode = jsonNode.get("schema");
+ StructType schema =
+ TableSchemaSerDe.fromJson(tableClient.getJsonHandler(),
schemaNode.asText());
+ return parseRowFromJsonWithSchema((ObjectNode) jsonNode.get("row"),
schema);
+ }
+ catch (JsonProcessingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public static Map<String, Object> convertRowToJsonObject(Row row)
+ {
+ StructType rowType = row.getSchema();
+ Map<String, Object> rowObject = new HashMap<>();
+ for (int fieldId = 0; fieldId < rowType.length(); fieldId++) {
+ StructField field = rowType.at(fieldId);
+ DataType fieldType = field.getDataType();
+ String name = field.getName();
+
+ if (row.isNullAt(fieldId)) {
+ rowObject.put(name, null);
+ continue;
+ }
+
+ Object value;
+ if (fieldType instanceof BooleanType) {
+ value = row.getBoolean(fieldId);
+ } else if (fieldType instanceof ByteType) {
+ value = row.getByte(fieldId);
+ } else if (fieldType instanceof ShortType) {
+ value = row.getShort(fieldId);
+ } else if (fieldType instanceof IntegerType) {
+ value = row.getInt(fieldId);
+ } else if (fieldType instanceof LongType) {
+ value = row.getLong(fieldId);
+ } else if (fieldType instanceof FloatType) {
+ value = row.getFloat(fieldId);
+ } else if (fieldType instanceof DoubleType) {
+ value = row.getDouble(fieldId);
+ } else if (fieldType instanceof DateType) {
+ value = DeltaTimeUtils.getSecondsFromDate(row.getInt(fieldId));
+ } else if (fieldType instanceof TimestampType) {
+ value = DeltaTimeUtils.getMillisFromTimestamp(row.getLong(fieldId));
+ } else if (fieldType instanceof StringType) {
+ value = row.getString(fieldId);
+ } else if (fieldType instanceof ArrayType) {
+ value = VectorUtils.toJavaList(row.getArray(fieldId));
+ } else if (fieldType instanceof MapType) {
+ value = VectorUtils.toJavaMap(row.getMap(fieldId));
+ } else if (fieldType instanceof StructType) {
+ Row subRow = row.getStruct(fieldId);
+ value = convertRowToJsonObject(subRow);
+ } else {
+ throw InvalidInput.exception("Unsupported fieldType[%s] for
fieldName[%s]", fieldType, name);
+ }
+
+ rowObject.put(name, value);
+ }
+
+ return rowObject;
+ }
+
+ private static Row parseRowFromJsonWithSchema(ObjectNode rowJsonNode,
StructType rowType)
+ {
+ return new DefaultJsonRow(rowJsonNode, rowType);
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
new file mode 100644
index 00000000000..b2752bd862c
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/main/resources/META-INF/services/org.apache.druid.initialization.DruidModule
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.druid.delta.common.DeltaLakeDruidModule
\ No newline at end of file
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
new file mode 100644
index 00000000000..6f68774dbd6
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputRowTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.data.FilteredColumnarBatch;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import io.delta.kernel.utils.CloseableIterator;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Optional;
+
+public class DeltaInputRowTest
+{
+ @Test
+ public void testDeltaInputRow() throws TableNotFoundException, IOException
+ {
+ final TableClient tableClient = DefaultTableClient.create(new
Configuration());
+ final Scan scan = DeltaTestUtils.getScan(tableClient);
+
+ CloseableIterator<FilteredColumnarBatch> scanFileIter =
scan.getScanFiles(tableClient);
+ int totalRecordCount = 0;
+ while (scanFileIter.hasNext()) {
+ try (CloseableIterator<FilteredColumnarBatch> data =
+ Scan.readData(
+ tableClient,
+ scan.getScanState(tableClient),
+ scanFileIter.next().getRows(),
+ Optional.empty()
+ )) {
+ while (data.hasNext()) {
+ FilteredColumnarBatch dataReadResult = data.next();
+ Row next = dataReadResult.getRows().next();
+ DeltaInputRow deltaInputRow = new DeltaInputRow(
+ next,
+ DeltaTestUtils.FULL_SCHEMA
+ );
+ Assert.assertNotNull(deltaInputRow);
+ Assert.assertEquals(DeltaTestUtils.DIMENSIONS,
deltaInputRow.getDimensions());
+
+ Map<String, Object> expectedRow =
DeltaTestUtils.EXPECTED_ROWS.get(totalRecordCount);
+ for (String key : expectedRow.keySet()) {
+ if
(DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key))
{
+ final long expectedMillis = ((Long) expectedRow.get(key)) * 1000;
+ Assert.assertEquals(expectedMillis,
deltaInputRow.getTimestampFromEpoch());
+ } else {
+ Assert.assertEquals(expectedRow.get(key),
deltaInputRow.getRaw(key));
+ }
+ }
+ totalRecordCount += 1;
+ }
+ }
+ }
+ Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(), totalRecordCount);
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
new file mode 100644
index 00000000000..24b1096abe1
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaInputSourceTest.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowListPlusRawValues;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.hamcrest.MatcherAssert;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class DeltaInputSourceTest
+{
+ @Before
+ public void setUp()
+ {
+ System.setProperty("user.timezone", "UTC");
+ }
+
+ @Test
+ public void testSampleDeltaTable() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader =
deltaInputSource.reader(DeltaTestUtils.FULL_SCHEMA, null, null);
+
+ List<InputRowListPlusRawValues> actualSampledRows =
sampleAllRows(inputSourceReader);
+ Assert.assertEquals(DeltaTestUtils.EXPECTED_ROWS.size(),
actualSampledRows.size());
+
+ for (int idx = 0; idx < DeltaTestUtils.EXPECTED_ROWS.size(); idx++) {
+ Map<String, Object> expectedRow = DeltaTestUtils.EXPECTED_ROWS.get(idx);
+ InputRowListPlusRawValues actualSampledRow = actualSampledRows.get(idx);
+ Assert.assertNull(actualSampledRow.getParseException());
+
+ Map<String, Object> actualSampledRawVals =
actualSampledRow.getRawValues();
+ Assert.assertNotNull(actualSampledRawVals);
+ Assert.assertNotNull(actualSampledRow.getRawValuesList());
+ Assert.assertEquals(1, actualSampledRow.getRawValuesList().size());
+
+ for (String key : expectedRow.keySet()) {
+ if
(DeltaTestUtils.FULL_SCHEMA.getTimestampSpec().getTimestampColumn().equals(key))
{
+ final long expectedMillis = (Long) expectedRow.get(key);
+ Assert.assertEquals(expectedMillis, actualSampledRawVals.get(key));
+ } else {
+ Assert.assertEquals(expectedRow.get(key),
actualSampledRawVals.get(key));
+ }
+ }
+ }
+ }
+
+ @Test
+ public void testReadAllDeltaTable() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.FULL_SCHEMA,
+ null,
+ null
+ );
+ final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows,
DeltaTestUtils.FULL_SCHEMA);
+ }
+
+ @Test
+ public void testReadAllDeltaTableSubSchema1() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.SCHEMA_1,
+ null,
+ null
+ );
+ final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows,
DeltaTestUtils.SCHEMA_1);
+ }
+
+ @Test
+ public void testReadAllDeltaTableWithSubSchema2() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final InputSourceReader inputSourceReader = deltaInputSource.reader(
+ DeltaTestUtils.SCHEMA_2,
+ null,
+ null
+ );
+ final List<InputRow> actualReadRows = readAllRows(inputSourceReader);
+ validateRows(DeltaTestUtils.EXPECTED_ROWS, actualReadRows,
DeltaTestUtils.SCHEMA_2);
+ }
+
+ @Test
+ public void testDeltaLakeWithCreateSplits()
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final List<InputSplit<DeltaSplit>> splits =
deltaInputSource.createSplits(null, null)
+
.collect(Collectors.toList());
+ Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(),
splits.size());
+
+ for (InputSplit<DeltaSplit> split : splits) {
+ final DeltaSplit deltaSplit = split.get();
+ final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+ DeltaTestUtils.DELTA_TABLE_PATH,
+ deltaSplit
+ );
+ List<InputSplit<DeltaSplit>> splitsResult =
deltaInputSourceWithSplit.createSplits(null, null)
+
.collect(Collectors.toList());
+ Assert.assertEquals(1, splitsResult.size());
+ Assert.assertEquals(deltaSplit, splitsResult.get(0).get());
+ }
+ }
+
+ @Test
+ public void testDeltaLakeWithReadSplits() throws IOException
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource(DeltaTestUtils.DELTA_TABLE_PATH, null);
+ final List<InputSplit<DeltaSplit>> splits =
deltaInputSource.createSplits(null, null)
+
.collect(Collectors.toList());
+ Assert.assertEquals(DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.size(),
splits.size());
+
+ for (int idx = 0; idx < splits.size(); idx++) {
+ final InputSplit<DeltaSplit> split = splits.get(idx);
+ final DeltaSplit deltaSplit = split.get();
+ final DeltaInputSource deltaInputSourceWithSplit = new DeltaInputSource(
+ DeltaTestUtils.DELTA_TABLE_PATH,
+ deltaSplit
+ );
+ final InputSourceReader inputSourceReader =
deltaInputSourceWithSplit.reader(
+ DeltaTestUtils.FULL_SCHEMA,
+ null,
+ null
+ );
+ final List<InputRow> actualRowsInSplit = readAllRows(inputSourceReader);
+ final List<Map<String, Object>> expectedRowsInSplit =
DeltaTestUtils.SPLIT_TO_EXPECTED_ROWS.get(idx);
+ validateRows(expectedRowsInSplit, actualRowsInSplit,
DeltaTestUtils.FULL_SCHEMA);
+ }
+ }
+
+ @Test
+ public void testNullTable()
+ {
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> new DeltaInputSource(null, null)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "tablePath cannot be null."
+ )
+ );
+ }
+
+ @Test
+ public void testSplitNonExistentTable()
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource("non-existent-table", null);
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> deltaInputSource.createSplits(null, null)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "tablePath[non-existent-table] not found."
+ )
+ );
+ }
+
+ @Test
+ public void testReadNonExistentTable()
+ {
+ final DeltaInputSource deltaInputSource = new
DeltaInputSource("non-existent-table", null);
+
+ MatcherAssert.assertThat(
+ Assert.assertThrows(
+ DruidException.class,
+ () -> deltaInputSource.reader(null, null, null)
+ ),
+ DruidExceptionMatcher.invalidInput().expectMessageIs(
+ "tablePath[non-existent-table] not found."
+ )
+ );
+ }
+
+ private List<InputRowListPlusRawValues> sampleAllRows(InputSourceReader
reader) throws IOException
+ {
+ List<InputRowListPlusRawValues> rows = new ArrayList<>();
+ try (CloseableIterator<InputRowListPlusRawValues> iterator =
reader.sample()) {
+ iterator.forEachRemaining(rows::add);
+ }
+ return rows;
+ }
+
+ private List<InputRow> readAllRows(InputSourceReader reader) throws
IOException
+ {
+ final List<InputRow> rows = new ArrayList<>();
+ try (CloseableIterator<InputRow> iterator = reader.read()) {
+ iterator.forEachRemaining(rows::add);
+ }
+ return rows;
+ }
+
+ private void validateRows(
+ final List<Map<String, Object>> expectedRows,
+ final List<InputRow> actualReadRows,
+ final InputRowSchema schema
+ )
+ {
+ Assert.assertEquals(expectedRows.size(), actualReadRows.size());
+
+ for (int idx = 0; idx < expectedRows.size(); idx++) {
+ final Map<String, Object> expectedRow = expectedRows.get(idx);
+ final InputRow actualInputRow = actualReadRows.get(idx);
+ for (String key : expectedRow.keySet()) {
+ if (!schema.getColumnsFilter().apply(key)) {
+ Assert.assertNull(actualInputRow.getRaw(key));
+ } else {
+ if (schema.getTimestampSpec().getTimestampColumn().equals(key)) {
+ final long expectedMillis = (Long) expectedRow.get(key) * 1000;
+ Assert.assertEquals(expectedMillis,
actualInputRow.getTimestampFromEpoch());
+ Assert.assertEquals(DateTimes.utc(expectedMillis),
actualInputRow.getTimestamp());
+ } else {
+ Assert.assertEquals(expectedRow.get(key),
actualInputRow.getRaw(key));
+ }
+ }
+ }
+ }
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
new file mode 100644
index 00000000000..180adaefcfb
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTestUtils.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import io.delta.kernel.Scan;
+import io.delta.kernel.ScanBuilder;
+import io.delta.kernel.Snapshot;
+import io.delta.kernel.Table;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.client.TableClient;
+import io.delta.kernel.types.StructType;
+import org.apache.druid.data.input.ColumnsFilter;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.DoubleDimensionSchema;
+import org.apache.druid.data.input.impl.FloatDimensionSchema;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * Refer to
extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md to
generate the
+ * sample Delta Lake table used in the unit tests.
+ */
+public class DeltaTestUtils
+{
+ /**
+ * The Delta table path used by unit tests.
+ */
+ public static final String DELTA_TABLE_PATH =
"src/test/resources/employee-delta-table";
+ /**
+ * The list of dimensions in the Delta table {@link #DELTA_TABLE_PATH}.
+ */
+ public static final List<String> DIMENSIONS = ImmutableList.of(
+ "id",
+ "birthday",
+ "name",
+ "age",
+ "salary",
+ "bonus",
+ "yoe",
+ "is_fulltime",
+ "last_vacation_time"
+ );
+
+ /**
+ * The expected set of rows from the first checkpoint file {@code
DELTA_TABLE_PATH/_delta_log/00000000000000000000.json}
+ */
+ private static final List<Map<String, Object>> SPLIT_0_EXPECTED_ROWS = new
ArrayList<>(
+ ImmutableList.of(
+ ImmutableMap.of(
+ "birthday", 1057881600L,
+ "name", "Employee1",
+ "id", 867799346L,
+ "salary", 87642.55209817083,
+ "age", (short) 20,
+ "yoe", 4
+ ),
+ ImmutableMap.of(
+ "birthday", 1035417600L,
+ "is_fulltime", false,
+ "name", "Employee2",
+ "id", 9963151889L,
+ "salary", 79404.63969727767,
+ "age", (short) 21,
+ "yoe", 2
+ ),
+ ImmutableMap.of(
+ "birthday", 890179200L,
+ "name", "Employee3",
+ "id", 2766777393L,
+ "salary", 92418.21424435009,
+ "age", (short) 25,
+ "yoe", 9
+ ),
+ ImmutableMap.of(
+ "birthday", 1073001600L,
+ "name", "Employee4",
+ "id", 6320361986L,
+ "salary", 97907.76612488469,
+ "age", (short) 20,
+ "yoe", 3
+ ),
+ ImmutableMap.of(
+ "birthday", 823996800L,
+ "is_fulltime", true,
+ "bonus", 4982.215f,
+ "name", "Employee5",
+ "id", 7068152260L,
+ "salary", 79037.77202099308,
+ "last_vacation_time", 1706256972000L,
+ "age", (short) 27,
+ "yoe", 9
+ )
+ )
+ );
+
+ /**
+ * The expected rows from second checkpoint file {@code
DELTA_TABLE_PATH/_delta_log/00000000000000000001.json}
+ */
+ private static final List<Map<String, Object>> SPLIT_1_EXPECTED_ROWS = new
ArrayList<>(
+ ImmutableList.of(
+ ImmutableMap.of(
+ "birthday", 937526400L,
+ "is_fulltime", false,
+ "name", "Employee1",
+ "id", 4693651733L,
+ "salary", 83845.11357786917,
+ "age", (short) 24,
+ "yoe", 3
+ ),
+ ImmutableMap.of(
+ "birthday", 810777600L,
+ "is_fulltime", false,
+ "name", "Employee2",
+ "id", 7132772589L,
+ "salary", 90140.44051385639,
+ "age", (short) 28,
+ "yoe", 8
+ ),
+ ImmutableMap.of(
+ "birthday", 1104969600L,
+ "is_fulltime", true,
+ "bonus", 3699.0881f,
+ "name", "Employee3",
+ "id", 6627278510L,
+ "salary", 58857.27649436368,
+ "last_vacation_time", 1706458554000L,
+ "age", (short) 19,
+ "yoe", 4
+ ),
+ ImmutableMap.of(
+ "birthday", 763257600L,
+ "is_fulltime", true,
+ "bonus", 2334.6675f,
+ "name", "Employee4",
+ "id", 4786204912L,
+ "salary", 93646.81222022788,
+ "last_vacation_time", 1706390154000L,
+ "age", (short) 29,
+ "yoe", 5
+ ),
+ ImmutableMap.of(
+ "birthday", 1114646400L,
+ "name", "Employee5",
+ "id", 2773939764L,
+ "salary", 66300.05339373322,
+ "age", (short) 18,
+ "yoe", 3
+ ),
+ ImmutableMap.of(
+ "birthday", 913334400L,
+ "is_fulltime", false,
+ "name", "Employee6",
+ "id", 8333438088L,
+ "salary", 59219.5257906128,
+ "age", (short) 25,
+ "yoe", 4
+ ),
+ ImmutableMap.of(
+ "birthday", 893894400L,
+ "is_fulltime", false,
+ "name", "Employee7",
+ "id", 8397454007L,
+ "salary", 61909.733851830584,
+ "age", (short) 25,
+ "yoe", 8
+ ),
+ ImmutableMap.of(
+ "birthday", 1038873600L,
+ "is_fulltime", true,
+ "bonus", 3000.0154f,
+ "name", "Employee8",
+ "id", 8925359945L,
+ "salary", 76588.05471316943,
+ "last_vacation_time", 1706195754000L,
+ "age", (short) 21,
+ "yoe", 1
+ ),
+ ImmutableMap.of(
+ "birthday", 989798400L,
+ "is_fulltime", true,
+ "bonus", 4463.3833f,
+ "name", "Employee9",
+ "id", 8154788551L,
+ "salary", 59787.98539015684,
+ "last_vacation_time", 1706181354000L,
+ "age", (short) 22,
+ "yoe", 4
+ ),
+ ImmutableMap.of(
+ "birthday", 912297600L,
+ "is_fulltime", false,
+ "name", "Employee10",
+ "id", 5884382356L,
+ "salary", 51565.91965119349,
+ "age", (short) 25,
+ "yoe", 9
+ )
+ )
+ );
+
+ /**
+ * Mapping of checkpoint file identifier to the list of expected rows in
that checkpoint.
+ */
+ public static final Map<Integer, List<Map<String, Object>>>
SPLIT_TO_EXPECTED_ROWS = new HashMap<>(
+ ImmutableMap.of(
+ 0, SPLIT_0_EXPECTED_ROWS,
+ 1, SPLIT_1_EXPECTED_ROWS
+ )
+ );
+
+ /**
+ * Complete set of expected rows across all checkpoint files for {@link
#DELTA_TABLE_PATH}.
+ */
+ public static final List<Map<String, Object>> EXPECTED_ROWS =
SPLIT_TO_EXPECTED_ROWS.values().stream()
+
.flatMap(List::stream)
+
.collect(Collectors.toList());
+
+ /**
+ * The Druid schema used for ingestion of {@link #DELTA_TABLE_PATH}.
+ */
+ public static final InputRowSchema FULL_SCHEMA = new InputRowSchema(
+ new TimestampSpec("birthday", "posix", null),
+ new DimensionsSpec(
+ ImmutableList.of(
+ new LongDimensionSchema("id"),
+ new LongDimensionSchema("birthday"),
+ new StringDimensionSchema("name"),
+ new LongDimensionSchema("age"),
+ new DoubleDimensionSchema("salary"),
+ new FloatDimensionSchema("bonus"),
+ new LongDimensionSchema("yoe"),
+ new StringDimensionSchema("is_fulltime"),
+ new LongDimensionSchema("last_vacation_time")
+ )
+ ),
+ ColumnsFilter.all()
+ );
+
+ /**
+ * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with
an inclusion filter applied.
+ */
+ public static final InputRowSchema SCHEMA_1 = new InputRowSchema(
+ new TimestampSpec("birthday", "posix", null),
+ new DimensionsSpec(
+ ImmutableList.of(
+ new LongDimensionSchema("id"),
+ new LongDimensionSchema("birthday"),
+ new StringDimensionSchema("name"),
+ new LongDimensionSchema("age"),
+ new DoubleDimensionSchema("salary"),
+ new FloatDimensionSchema("bonus"),
+ new LongDimensionSchema("yoe"),
+ new StringDimensionSchema("is_fulltime"),
+ new LongDimensionSchema("last_vacation_time")
+ )
+ ),
+ ColumnsFilter.inclusionBased(ImmutableSet.of("id", "birthday", "name",
"is_fulltime"))
+ );
+
+ /**
+ * Similar to {@link #FULL_SCHEMA}, but with a smaller set of columns with
an exclusion filter applied. A non-existent
+ * column is added to the exclusion filter - it should silently get thrown
away.
+ */
+ public static final InputRowSchema SCHEMA_2 = new InputRowSchema(
+ new TimestampSpec("birthday", "posix", null),
+ new DimensionsSpec(
+ ImmutableList.of(
+ new LongDimensionSchema("id"),
+ new LongDimensionSchema("birthday"),
+ new StringDimensionSchema("name"),
+ new LongDimensionSchema("age"),
+ new DoubleDimensionSchema("salary"),
+ new FloatDimensionSchema("bonus"),
+ new LongDimensionSchema("yoe"),
+ new StringDimensionSchema("is_fulltime"),
+ new LongDimensionSchema("last_vacation_time")
+ )
+ ),
+ ColumnsFilter.exclusionBased(ImmutableSet.of("last_vacation_time",
"bonus", "non_existent_column"))
+ );
+
+ /**
+ * A simple wrapper that builds the table scan for {@link #DELTA_TABLE_PATH}
meant to be used in tests.
+ */
+ public static Scan getScan(final TableClient tableClient) throws
TableNotFoundException
+ {
+ final Table table = Table.forPath(tableClient, DELTA_TABLE_PATH);
+ final Snapshot snapshot = table.getLatestSnapshot(tableClient);
+ final StructType readSchema = snapshot.getSchema(tableClient);
+ final ScanBuilder scanBuilder = snapshot.getScanBuilder(tableClient)
+ .withReadSchema(tableClient,
readSchema);
+ return scanBuilder.build();
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
new file mode 100644
index 00000000000..de78ed97aa3
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/DeltaTimeUtilsTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.time.Instant;
+
+public class DeltaTimeUtilsTest
+{
+ @Before
+ public void setUp()
+ {
+ System.setProperty("user.timezone", "UTC");
+ }
+
+ @Test
+ public void testTimestampValue()
+ {
+ Assert.assertEquals(
+ Instant.parse("2018-02-02T00:28:02.000Z"),
+ Instant.ofEpochMilli(
+ DeltaTimeUtils.getMillisFromTimestamp(
+ Instant.parse("2018-02-02T00:28:02.000Z").toEpochMilli() *
1_000
+ )
+ )
+ );
+
+ Assert.assertEquals(
+ Instant.parse("2024-01-31T00:58:03.000Z"),
+ Instant.ofEpochMilli(
+ DeltaTimeUtils.getMillisFromTimestamp(
+ Instant.parse("2024-01-31T00:58:03.002Z").toEpochMilli() *
1_000
+ )
+ )
+ );
+ }
+
+ @Test
+ public void testDateTimeValue()
+ {
+ Assert.assertEquals(
+ Instant.parse("2020-02-01T00:00:00.000Z"),
+ Instant.ofEpochSecond(
+ DeltaTimeUtils.getSecondsFromDate(
+ (int)
Intervals.of("1970-01-01/2020-02-01").toDuration().getStandardDays()
+ )
+ )
+ );
+
+ Assert.assertEquals(
+ Instant.parse("2024-01-01T00:00:00.000Z"),
+ Instant.ofEpochSecond(
+ DeltaTimeUtils.getSecondsFromDate(
+ (int)
Intervals.of("1970-01-01/2024-01-01T02:23:00").toDuration().getStandardDays()
+ )
+ )
+ );
+ }
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
new file mode 100644
index 00000000000..eb06f532a02
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/java/org/apache/druid/delta/input/RowSerdeTest.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.delta.input;
+
+import io.delta.kernel.Scan;
+import io.delta.kernel.TableNotFoundException;
+import io.delta.kernel.data.Row;
+import io.delta.kernel.defaults.client.DefaultTableClient;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class RowSerdeTest
+{
+ @Test
+ public void testSerializeDeserializeRoundtrip() throws TableNotFoundException
+ {
+ final DefaultTableClient tableClient = DefaultTableClient.create(new
Configuration());
+ final Scan scan = DeltaTestUtils.getScan(tableClient);
+ final Row scanState = scan.getScanState(tableClient);
+
+ final String rowJson = RowSerde.serializeRowToJson(scanState);
+ final Row row = RowSerde.deserializeRowFromJson(tableClient, rowJson);
+
+ Assert.assertEquals(scanState.getSchema(), row.getSchema());
+ }
+
+}
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md
new file mode 100644
index 00000000000..c524adec4b5
--- /dev/null
+++ b/extensions-contrib/druid-deltalake-extensions/src/test/resources/README.md
@@ -0,0 +1,68 @@
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing,
+ ~ software distributed under the License is distributed on an
+ ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ ~ KIND, either express or implied. See the License for the
+ ~ specific language governing permissions and limitations
+ ~ under the License.
+ -->
+
+### Generate Delta Table for Unit Tests
+
+To test Delta Lake ingestion, use the Python script `create_delta_table.py` to
generate a sample Delta table.
+Create a conda env `delta_test` with all the requirements specified in
`requirements.txt` installed in the
+environment:
+```shell
+conda create --name delta_test --file requirements.txt
+```
+
+To activate the environment:
+
+```shell
+conda activate delta_test
+```
+
+From the conda environment, you can run the python script:
+
+```python
+python3 create_delta_table.py
+```
+
+By default, the script uses `append` mode to generate 10 random records and
writes the
+Delta table to `resources/employee-delta-table`. You can override the defaults
by supplying the command line arguments:
+
+```shell
+python3 create_delta_table.py -h
+
+usage: create_delta_table.py [-h] [--save_mode {append,overwrite}]
[--save_path SAVE_PATH] [--num_records NUM_RECORDS]
+
+Script to write a Delta Lake table.
+
+optional arguments:
+ -h, --help show this help message and exit
+ --save_mode {append,overwrite}
+ Specify write mode (append/overwrite) (default: append)
+ --save_path SAVE_PATH
+ Save path for Delta table (default:
<DRUID_BASE_PATH>/druid/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table)
+ --num_records NUM_RECORDS
+ Specify number of Delta records to write (default: 10)
+```
+
+The test data in `resources/employee-delta-table` was generated by:
+```shell
+python3 create_delta_table.py
+python3 create_delta_table.py --num_records=5 --save_mode=append
+```
+
+This creates a total of 15 Delta records across two transactional commits. The
resulting Delta table is checked in
+to the repo. The expectated rows `DeltaTestUtils.java` are updated accordingly.
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
new file mode 100755
index 00000000000..ab9ec87fb00
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/create_delta_table.py
@@ -0,0 +1,122 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+
+import argparse
+import delta
+import pyspark
+from pyspark.sql.types import StructType, StructField, ShortType, StringType,
TimestampType, LongType, IntegerType, DoubleType, FloatType, DateType,
BooleanType
+from datetime import datetime, timedelta
+import random
+
+
+def config_spark_with_delta_lake():
+ builder = (
+ pyspark.sql.SparkSession.builder.appName("DeltaLakeApp")
+ .config("spark.sql.extensions",
"io.delta.sql.DeltaSparkSessionExtension")
+ .config(
+ "spark.sql.catalog.spark_catalog",
+ "org.apache.spark.sql.delta.catalog.DeltaCatalog",
+ )
+ )
+ spark = delta.configure_spark_with_delta_pip(builder).getOrCreate()
+ spark.sparkContext.setLogLevel("ERROR")
+ return spark
+
+
+def create_dataset(num_records):
+ """
+ Generate a mock employee dataset with different datatypes for testing
purposes.
+
+ Parameters:
+ - num_records (int): Number of records to generate.
+
+ Returns:
+ - Tuple: A tuple containing a list of records and the corresponding schema.
+ - List of Records: Each record is a tuple representing a row of data.
+ - StructType: The schema defining the structure of the records.
+
+ Example:
+ ```python
+ data, schema = create_dataset(10)
+ ```
+ """
+ schema = StructType([
+ StructField("id", LongType(), False),
+ StructField("birthday", DateType(), False),
+ StructField("name", StringType(), True),
+ StructField("age", ShortType(), True),
+ StructField("salary", DoubleType(), True),
+ StructField("bonus", FloatType(), True),
+ StructField("yoe", IntegerType(), True),
+ StructField("is_fulltime", BooleanType(), True),
+ StructField("last_vacation_time", TimestampType(), True)
+ ])
+
+ data = []
+ current_date = datetime.now()
+
+ for i in range(num_records):
+ birthday = current_date - timedelta(days=random.randint(365 * 18, 365
* 30))
+ age = (current_date - birthday).days // 365
+ is_fulltime = random.choice([True, False, None])
+ record = (
+ random.randint(1, 10000000000),
+ birthday,
+ f"Employee{i+1}",
+ age,
+ random.uniform(50000, 100000),
+ random.uniform(1000, 5000) if is_fulltime else None,
+ random.randint(1, min(20, age - 15)),
+ is_fulltime,
+ datetime.now() - timedelta(hours=random.randint(1, 90)) if
is_fulltime else None,
+ )
+ data.append(record)
+ return data, schema
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Script to write a Delta Lake
table.",
+
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+ parser.add_argument('--save_mode', choices=('append', 'overwrite'),
default="overwrite",
+ help="Specify write mode (append/overwrite)")
+ parser.add_argument('--save_path', default=os.path.join(os.getcwd(),
"employee-delta-table"),
+ help="Save path for Delta table")
+ parser.add_argument('--num_records', type=int, default=10,
+ help="Specify number of Delta records to write")
+
+ args = parser.parse_args()
+
+ save_mode = args.save_mode
+ save_path = args.save_path
+ num_records = args.num_records
+
+ spark = config_spark_with_delta_lake()
+
+ data, schema = create_dataset(num_records=num_records)
+ df = spark.createDataFrame(data, schema=schema)
+ df.write.format("delta").mode(save_mode).save(save_path)
+
+ df.show()
+
+ print(f"Generated Delta records to {save_path} in {save_mode} mode with
{num_records} records.")
+
+
+if __name__ == "__main__":
+ main()
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..33d75bafdc5
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..3bdcd64c34e
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..f7e53b3258b
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..7de25e042fc
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..e6c6dc8bf9e
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..693057b9126
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..81f3070f023
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..3a8e19d5f78
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..16b0da7f9b2
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..2f8bfc23a92
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..b11e9d379d1
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..a0daef3f2b6
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..96e2c291bf2
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..ba21f2ab460
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..c55ee1b40bc
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
new file mode 100644
index 00000000000..57a765a96ee
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/.part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
new file mode 100644
index 00000000000..75a1c6db2e3
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000000.json.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
new file mode 100644
index 00000000000..e026fad26ba
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/.00000000000000000001.json.crc
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
new file mode 100644
index 00000000000..4a07c503c43
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000000.json
@@ -0,0 +1,13 @@
+{"commitInfo":{"timestamp":1706498159640,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]"},"isolationLevel":"Serializable","isBlindAppend":false,"operationMetrics":{"numFiles":"10","numOutputRows":"10","numOutputBytes":"23710"},"engineInfo":"Apache-Spark/3.5.0
Delta-Lake/3.0.0","txnId":"5557f0e2-6cdc-4249-bd19-368bec96751d"}}
+{"metaData":{"id":"c29741f1-4649-43a8-b2d9-9d46c42c58e1","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"birthday\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"age\",\"type\":\"short\",\"nullable\":true,\"metadata\":{}},{\"name\":\"salary\",\"type\":\"double\",\"nullab
[...]
+{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
+{"add":{"path":"part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"maxValues\":{\"id\":4693651733,\"birthday\":\"1999-09-17\",\"name\":\"Employee1\",\"age\":24,\"salary\":83845.11357786917,\"yoe\":3},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"maxValues\":{\"id\":7132772589,\"birthday\":\"1995-09-11\",\"name\":\"Employee2\",\"age\":28,\"salary\":90140.44051385639,\"yoe\":8},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",\"age\":19,\"salary\":58857.27649436368,\"bonus\":3699.0881,\"yoe\":4,\"last_vacation_time\":\"2024-01-28T08:15:54.648-08:00\"},\"maxValues\":{\"id\":6627278510,\"birthday\":\"2005-01-06\",\"name\":\"Employee3\",
[...]
+{"add":{"path":"part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",\"age\":29,\"salary\":93646.81222022788,\"bonus\":2334.6675,\"yoe\":5,\"last_vacation_time\":\"2024-01-27T13:15:54.648-08:00\"},\"maxValues\":{\"id\":4786204912,\"birthday\":\"1994-03-10\",\"name\":\"Employee4\",
[...]
+{"add":{"path":"part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"maxValues\":{\"id\":2773939764,\"birthday\":\"2005-04-28\",\"name\":\"Employee5\",\"age\":18,\"salary\":66300.05339373322,\"yoe\":3},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet","partitionValues":{},"size":2316,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"maxValues\":{\"id\":8333438088,\"birthday\":\"1998-12-11\",\"name\":\"Employee6\",\"age\":25,\"salary\":59219.5257906128,\"yoe\":4},\"nullCount\":{\"id\":0,\"bi
[...]
+{"add":{"path":"part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"maxValues\":{\"id\":8397454007,\"birthday\":\"1998-04-30\",\"name\":\"Employee7\",\"age\":25,\"salary\":61909.733851830584,\"yoe\":8},\"nullCount\":{\"id\":0,
[...]
+{"add":{"path":"part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",\"age\":21,\"salary\":76588.05471316943,\"bonus\":3000.0154,\"yoe\":1,\"last_vacation_time\":\"2024-01-25T07:15:54.648-08:00\"},\"maxValues\":{\"id\":8925359945,\"birthday\":\"2002-12-03\",\"name\":\"Employee8\",
[...]
+{"add":{"path":"part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet","partitionValues":{},"size":2455,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",\"age\":22,\"salary\":59787.98539015684,\"bonus\":4463.3833,\"yoe\":4,\"last_vacation_time\":\"2024-01-25T03:15:54.648-08:00\"},\"maxValues\":{\"id\":8154788551,\"birthday\":\"2001-05-14\",\"name\":\"Employee9\",
[...]
+{"add":{"path":"part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet","partitionValues":{},"size":2324,"modificationTime":1706498158981,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"maxValues\":{\"id\":5884382356,\"birthday\":\"1998-11-29\",\"name\":\"Employee10\",\"age\":25,\"salary\":51565.91965119349,\"yoe\":9},\"nullCount\":{\"id\":0,
[...]
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
new file mode 100644
index 00000000000..188f2417e6a
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/_delta_log/00000000000000000001.json
@@ -0,0 +1,6 @@
+{"commitInfo":{"timestamp":1706498178162,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"6","numOutputRows":"5","numOutputBytes":"12655"},"engineInfo":"Apache-Spark/3.5.0
Delta-Lake/3.0.0","txnId":"38049998-65f7-4ed6-8d48-e51c5da502c6"}}
+{"add":{"path":"part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet","partitionValues":{},"size":2301,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"maxValues\":{\"id\":867799346,\"birthday\":\"2003-07-11\",\"name\":\"Employee1\",\"age\":20,\"salary\":87642.55209817083,\"yoe\":4},\"nullCount\":{\"id\":0,\"bi
[...]
+{"add":{"path":"part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet","partitionValues":{},"size":2317,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"maxValues\":{\"id\":9963151889,\"birthday\":\"2002-10-24\",\"name\":\"Employee2\",\"age\":21,\"salary\":79404.63969727767,\"yoe\":2},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"maxValues\":{\"id\":2766777393,\"birthday\":\"1998-03-18\",\"name\":\"Employee3\",\"age\":25,\"salary\":92418.21424435009,\"yoe\":9},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet","partitionValues":{},"size":2302,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"maxValues\":{\"id\":6320361986,\"birthday\":\"2004-01-02\",\"name\":\"Employee4\",\"age\":20,\"salary\":97907.76612488469,\"yoe\":3},\"nullCount\":{\"id\":0,\"
[...]
+{"add":{"path":"part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet","partitionValues":{},"size":2454,"modificationTime":1706498176452,"dataChange":true,"stats":"{\"numRecords\":1,\"minValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\"age\":27,\"salary\":79037.77202099308,\"bonus\":4982.215,\"yoe\":9,\"last_vacation_time\":\"2024-01-26T00:16:12.196-08:00\"},\"maxValues\":{\"id\":7068152260,\"birthday\":\"1996-02-11\",\"name\":\"Employee5\",\
[...]
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
new file mode 100644
index 00000000000..d8882215890
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-b17c520a-0c50-4e49-b8e7-46132a57d039-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
new file mode 100644
index 00000000000..f007f82b3b6
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00000-f0224389-c0df-4dbc-90e5-de1d6a5b5ac6-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
new file mode 100644
index 00000000000..549b403d153
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-e18dc7d0-db98-40f2-9185-45237f51b9bf-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
new file mode 100644
index 00000000000..006b8482ac8
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00001-f5c4b19d-a2b2-4189-8927-97fe1720df8d-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
new file mode 100644
index 00000000000..d3acfdf32fe
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00002-b2249397-0232-4a5c-b504-62c7c27702c1-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
new file mode 100644
index 00000000000..d76352414b3
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-648766cd-8ebd-475a-afbb-44ae0b9cba30-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
new file mode 100644
index 00000000000..19fc8efe4f4
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00003-fa1d35b8-bb75-4145-ac40-6ccbc04acc79-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
new file mode 100644
index 00000000000..3490b566213
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00004-d580234a-54cb-43b7-87db-448c67a315df-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
new file mode 100644
index 00000000000..bdf5e95f168
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-963e7ff5-c414-444c-8984-6baecf6987ee-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
new file mode 100644
index 00000000000..3cbe3b65a87
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00005-9ff9b585-5a9e-415f-b28a-a85d960ccb04-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
new file mode 100644
index 00000000000..ec97497ff59
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00006-78cd057e-faaa-477d-b5fd-d00a857f7e54-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
new file mode 100644
index 00000000000..6145827b330
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-00eb0d30-e71e-4092-8ea2-0ee576ca7327-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
new file mode 100644
index 00000000000..858a8a10b18
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00007-15147217-b81a-45ab-92d4-24d725cc07e1-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
new file mode 100644
index 00000000000..fa13a5f9eaf
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00008-9f526a56-2392-4f1c-8c07-3dac19b12e91-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
new file mode 100644
index 00000000000..13863ff9731
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-e21486a5-e177-4c02-b256-bc890fadce7e-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
new file mode 100644
index 00000000000..1f314548386
Binary files /dev/null and
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/employee-delta-table/part-00009-ee9dd918-aaec-4f80-bd63-e369c6335699-c000.snappy.parquet
differ
diff --git
a/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
new file mode 100644
index 00000000000..8a846d26385
--- /dev/null
+++
b/extensions-contrib/druid-deltalake-extensions/src/test/resources/requirements.txt
@@ -0,0 +1,2 @@
+delta-spark==3.0.0
+pyspark==3.5.0
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index d97e8708367..d190f405ed1 100644
--- a/pom.xml
+++ b/pom.xml
@@ -230,7 +230,9 @@
<module>extensions-contrib/opentelemetry-emitter</module>
<module>extensions-contrib/kubernetes-overlord-extensions</module>
<module>extensions-contrib/druid-iceberg-extensions</module>
+ <module>extensions-contrib/druid-deltalake-extensions</module>
<module>extensions-contrib/spectator-histogram</module>
+
<!-- distribution packaging -->
<module>distribution</module>
<!-- Revised integration tests -->
diff --git a/web-console/assets/delta.png b/web-console/assets/delta.png
new file mode 100644
index 00000000000..db535506c7b
Binary files /dev/null and b/web-console/assets/delta.png differ
diff --git a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
index 3594f70c992..a675205fdfc 100644
--- a/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
+++ b/web-console/src/druid-models/ingestion-spec/ingestion-spec.tsx
@@ -98,6 +98,7 @@ export type IngestionComboType =
| 'index_parallel:inline'
| 'index_parallel:s3'
| 'index_parallel:azure'
+ | 'index_parallel:delta'
| 'index_parallel:google'
| 'index_parallel:hdfs';
@@ -136,6 +137,7 @@ export function getIngestionComboType(
switch (inputSource.type) {
case 'local':
case 'http':
+ case 'delta':
case 'druid':
case 'inline':
case 's3':
@@ -170,6 +172,9 @@ export function getIngestionTitle(ingestionType:
IngestionComboTypeWithExtra): s
case 'index_parallel:azure':
return 'Azure Data Lake';
+ case 'index_parallel:delta':
+ return 'Delta Lake';
+
case 'index_parallel:google':
return 'Google Cloud Storage';
@@ -462,7 +467,7 @@ export function getIoConfigFormFields(ingestionComboType:
IngestionComboType): F
name: 'inputSource.type',
label: 'Source type',
type: 'string',
- suggestions: ['local', 'http', 'inline', 's3', 'azure', 'google', 'hdfs'],
+ suggestions: ['local', 'http', 'inline', 'delta', 's3', 'azure', 'google',
'hdfs'],
info: (
<p>
Druid connects to raw data through{' '}
@@ -895,6 +900,18 @@ export function getIoConfigFormFields(ingestionComboType:
IngestionComboType): F
inputSourceFilter,
];
+ case 'index_parallel:delta':
+ return [
+ inputSourceType,
+ {
+ name: 'inputSource.tablePath',
+ label: 'Delta table path',
+ type: 'string',
+ placeholder: '/path/to/deltaTable',
+ required: true,
+ },
+ ];
+
case 'index_parallel:hdfs':
return [
inputSourceType,
@@ -1085,6 +1102,7 @@ export function getIoConfigTuningFormFields(
case 'index_parallel:s3':
case 'index_parallel:azure':
case 'index_parallel:google':
+ case 'index_parallel:delta':
case 'index_parallel:hdfs':
return [
{
diff --git a/web-console/src/druid-models/input-source/input-source.tsx
b/web-console/src/druid-models/input-source/input-source.tsx
index 4e0647320e5..01b111b957f 100644
--- a/web-console/src/druid-models/input-source/input-source.tsx
+++ b/web-console/src/druid-models/input-source/input-source.tsx
@@ -55,6 +55,9 @@ export interface InputSource {
// inline
data?: string;
+ // delta
+ tablePath?: string;
+
// hdfs
paths?: string | string[];
@@ -111,6 +114,10 @@ export type InputSourceDesc =
type: 'hdfs';
paths?: string | string[];
}
+ | {
+ type: 'delta';
+ tablePath?: string;
+ }
| {
type: 'sql';
database: any;
@@ -158,6 +165,12 @@ export function issueWithInputSource(inputSource:
InputSource | undefined): stri
}
return;
+ case 'delta':
+ if (!inputSource.tablePath) {
+ return 'must have tablePath';
+ }
+ return;
+
case 'hdfs':
if (!inputSource.paths) {
return 'must have paths';
@@ -169,7 +182,18 @@ export function issueWithInputSource(inputSource:
InputSource | undefined): stri
}
}
-const KNOWN_TYPES = ['inline', 'druid', 'http', 'local', 's3', 'azure',
'google', 'hdfs', 'sql'];
+const KNOWN_TYPES = [
+ 'inline',
+ 'druid',
+ 'http',
+ 'local',
+ 's3',
+ 'azure',
+ 'delta',
+ 'google',
+ 'hdfs',
+ 'sql',
+];
export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
// inline
@@ -574,6 +598,16 @@ export const INPUT_SOURCE_FIELDS: Field<InputSource>[] = [
required: true,
},
+ // delta lake
+ {
+ name: 'tablePath',
+ label: 'Delta table path',
+ type: 'string',
+ placeholder: '/path/to/deltaTable',
+ defined: typeIsKnown(KNOWN_TYPES, 'delta'),
+ required: true,
+ },
+
// sql
{
name: 'database.type',
diff --git
a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
index f5956786afd..b0a81a1800a 100644
---
a/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
+++
b/web-console/src/views/load-data-view/__snapshots__/load-data-view.spec.tsx.snap
@@ -202,6 +202,20 @@ exports[`LoadDataView matches snapshot batch 1`] = `
Google Cloud Storage
</p>
</Blueprint4.Card>
+ <Blueprint4.Card
+ className="ingestion-card"
+ elevation={1}
+ interactive={true}
+ onClick={[Function]}
+ >
+ <img
+ alt="Ingestion tile for index_parallel:delta"
+ src="/some/base_url/assets/delta.png"
+ />
+ <p>
+ Delta Lake
+ </p>
+ </Blueprint4.Card>
<Blueprint4.Card
className="ingestion-card"
elevation={1}
diff --git a/web-console/src/views/load-data-view/load-data-view.tsx
b/web-console/src/views/load-data-view/load-data-view.tsx
index db7e8e0f093..18fdf045a5b 100644
--- a/web-console/src/views/load-data-view/load-data-view.tsx
+++ b/web-console/src/views/load-data-view/load-data-view.tsx
@@ -859,6 +859,7 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
{this.renderIngestionCard('index_parallel:s3')}
{this.renderIngestionCard('index_parallel:azure')}
{this.renderIngestionCard('index_parallel:google')}
+ {this.renderIngestionCard('index_parallel:delta')}
{this.renderIngestionCard('index_parallel:hdfs')}
{this.renderIngestionCard('index_parallel:druid')}
{this.renderIngestionCard('index_parallel:http')}
@@ -921,6 +922,14 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
</>
);
+ case 'index_parallel:delta':
+ return (
+ <>
+ <p>Load data from Delta Lake.</p>
+ <p>Data must be stored in the Delta Lake format.</p>
+ </>
+ );
+
case 'index_parallel:druid':
return (
<>
@@ -1009,6 +1018,7 @@ export class LoadDataView extends
React.PureComponent<LoadDataViewProps, LoadDat
case 'index_parallel:s3':
case 'index_parallel:azure':
case 'index_parallel:google':
+ case 'index_parallel:delta':
case 'index_parallel:hdfs':
case 'kafka':
case 'kinesis':
diff --git
a/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
b/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
index 2a7e2361d5e..c5dac4c12f9 100644
---
a/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
+++
b/web-console/src/views/workbench-view/input-source-step/input-source-info.tsx
@@ -69,6 +69,9 @@ export const InputSourceInfo = React.memo(function
InputSourceInfo(props: InputS
case 'google':
return <p>Load text based, avro, orc, or parquet data from the Google
Blobstore.</p>;
+ case 'delta':
+ return <p>Load data from Delta Lake.</p>;
+
case 'hdfs':
return <p>Load text based, avro, orc, or parquet data from HDFS.</p>;
diff --git a/website/.spelling b/website/.spelling
index 640f9887331..227e97160fb 100644
--- a/website/.spelling
+++ b/website/.spelling
@@ -63,6 +63,7 @@ DRUIDVERSION
DataSketches
DateTime
DateType
+DeltaLakeInputSource
dimensionsSpec
DimensionSpec
DimensionSpecs
@@ -79,6 +80,7 @@ downsamples
downsampling
Dropwizard
dropwizard
+druid-deltalake-extensions
DruidInputSource
DruidSQL
DynamicConfigProvider
@@ -161,6 +163,7 @@ Kerberos
KeyStores
Kinesis
Kubernetes
+Lakehouse
LDAPS
LRU
LZ4
@@ -525,6 +528,7 @@ SVG
symlink
syntaxes
systemFields
+tablePath
tiering
timeseries
Timeseries
@@ -571,6 +575,7 @@ varchar
vectorizable
vectorize
vectorizeVirtualColumns
+versioned
versioning
virtualColumns
w.r.t.
@@ -795,9 +800,11 @@ multi-server
BasicDataSource
LeaderLatch
2.x
-28.x
+28.x
+3.0.x
3.5.x
3.4.x
+3.5.x.
AllowAll
AuthenticationResult
AuthorizationLoadingLookupTest
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]