cryptoe commented on code in PR #14329: URL: https://github.com/apache/druid/pull/14329#discussion_r1217950267
########## docs/development/extensions-contrib/iceberg.md: ########## @@ -0,0 +1,120 @@ +--- +id: iceberg +title: "Iceberg" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +## Iceberg Ingest Extension + +This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. + +Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata on metadata files, it is still dependent on a metastore for managing a certain amount of metadata. +These metastores are defined as Iceberg catalogs and this extension supports connecting to the following catalog types: +* Hive metastore catalog +* Local catalog + +Support for AWS Glue and REST based catalogs are not available yet. + +For a given catalog, iceberg table name and filters, The IcebergInputSource works by reading the table from the catalog, applying the filters and extracting all the underlying live data files up to the latest snapshot. +The data files are in either Parquet, ORC or Avro formats and all of these have InputFormat support in Druid. The data files typically reside in a warehouse location which could be in HDFS, S3 or the local filesystem. +This extension relies on the existing InputSource connectors in Druid to read the data files from the warehouse. Therefore, the IcebergInputSource can be considered as an intermediate InputSource which provides the file paths for other InputSource implementations. + +### Load the Iceberg Ingest extension + +To use the iceberg extension, add the `druid-iceberg-extensions` to the list of loaded extensions. See [Loading extensions](../../configuration/extensions.md#loading-extensions) for more information. + + +### Hive Metastore catalog + +For Druid to seamlessly talk to the Hive Metastore, ensure that the Hive specific configuration files such as `hive-site.xml` and `core-site.xml` are available in the Druid classpath. Review Comment: Where are they needed. I am assuming they are only needed on the peon's ? ########## docs/development/extensions-contrib/iceberg.md: ########## @@ -0,0 +1,120 @@ +--- +id: iceberg +title: "Iceberg" +--- + +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> + +## Iceberg Ingest Extension + +This extension provides [IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) which enables ingestion of data stored in the Iceberg table format into Druid. + +Apache Iceberg is an open table format for huge analytic datasets. Even though iceberg manages most of its metadata on metadata files, it is still dependent on a metastore for managing a certain amount of metadata. Review Comment: This might need rephrasing. Did you mean a metadata store here ? ########## extensions-contrib/druid-iceberg-extensions/pom.xml: ########## @@ -0,0 +1,314 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-iceberg-extensions</artifactId> + <name>druid-iceberg-extensions</name> + <description>druid-iceberg-extensions</description> + + + <parent> + <artifactId>druid</artifactId> + <groupId>org.apache.druid</groupId> + <version>27.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <properties> Review Comment: Nit: do we require an empty block here ? ########## processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceAdapter.java: ########## @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.data.input; + +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.impl.LocalInputSourceAdapter; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.parsers.CloseableIterator; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +/** + * A wrapper on top of {@link SplittableInputSource} that handles input source creation. Review Comment: I did not understand the intent of this class. More details in the class level docs would be helpful. ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.druid.iceberg.guice.HiveConf; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.hive.HiveCatalog; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Hive Metastore specific implementation of iceberg catalog. + * Kerberos authentication is performed if the credentials are provided in the catalog properties + */ +public class HiveIcebergCatalog extends IcebergCatalog +{ + public static final String TYPE_KEY = "hive"; + + @JsonProperty + private String warehousePath; + + @JsonProperty + private String catalogUri; + + @JsonProperty + private Map<String, String> catalogProperties; + + private final Configuration configuration; + + private BaseMetastoreCatalog hiveCatalog; + + private static final Logger log = new Logger(HiveIcebergCatalog.class); + + @JsonCreator + public HiveIcebergCatalog( + @JsonProperty("warehousePath") String warehousePath, + @JsonProperty("catalogUri") String catalogUri, + @JsonProperty("catalogProperties") @Nullable + Map<String, String> catalogProperties, + @JacksonInject @HiveConf Configuration configuration + ) + { + this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null"); + this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null"); + this.catalogProperties = catalogProperties != null ? catalogProperties : new HashMap<>(); + this.configuration = configuration; + this.catalogProperties + .forEach(this.configuration::set); + this.hiveCatalog = retrieveCatalog(); + } + + @Override + public BaseMetastoreCatalog retrieveCatalog() + { + if (hiveCatalog == null) { + hiveCatalog = setupCatalog(); + } + return hiveCatalog; + } + + private HiveCatalog setupCatalog() + { + HiveCatalog catalog = new HiveCatalog(); + authenticate(); Review Comment: Do we need to handle remote http/rpc related exceptions here ? ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java: ########## @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JacksonInject; +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import com.google.common.base.Strings; +import org.apache.druid.iceberg.guice.HiveConf; +import org.apache.druid.java.util.common.ISE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.hive.HiveCatalog; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Hive Metastore specific implementation of iceberg catalog. + * Kerberos authentication is performed if the credentials are provided in the catalog properties + */ +public class HiveIcebergCatalog extends IcebergCatalog +{ + public static final String TYPE_KEY = "hive"; + + @JsonProperty + private String warehousePath; + + @JsonProperty + private String catalogUri; + + @JsonProperty + private Map<String, String> catalogProperties; + + private final Configuration configuration; + + private BaseMetastoreCatalog hiveCatalog; + + private static final Logger log = new Logger(HiveIcebergCatalog.class); + + @JsonCreator + public HiveIcebergCatalog( + @JsonProperty("warehousePath") String warehousePath, + @JsonProperty("catalogUri") String catalogUri, + @JsonProperty("catalogProperties") @Nullable + Map<String, String> catalogProperties, + @JacksonInject @HiveConf Configuration configuration + ) + { + this.warehousePath = Preconditions.checkNotNull(warehousePath, "warehousePath cannot be null"); + this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri cannot be null"); + this.catalogProperties = catalogProperties != null ? catalogProperties : new HashMap<>(); + this.configuration = configuration; + this.catalogProperties + .forEach(this.configuration::set); + this.hiveCatalog = retrieveCatalog(); + } + + @Override + public BaseMetastoreCatalog retrieveCatalog() + { + if (hiveCatalog == null) { + hiveCatalog = setupCatalog(); + } + return hiveCatalog; + } + + private HiveCatalog setupCatalog() + { + HiveCatalog catalog = new HiveCatalog(); + authenticate(); + catalog.setConf(configuration); + catalogProperties.put("warehouse", warehousePath); + catalogProperties.put("uri", catalogUri); + catalog.initialize("hive", catalogProperties); + return catalog; + } + + private void authenticate() + { + String principal = catalogProperties.getOrDefault("principal", null); Review Comment: Are there other types of authentication methods or only we have support for krb5 in the initial version. In any case we should document this explicitly. ########## extensions-contrib/druid-iceberg-extensions/pom.xml: ########## @@ -0,0 +1,314 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-iceberg-extensions</artifactId> + <name>druid-iceberg-extensions</name> + <description>druid-iceberg-extensions</description> + + + <parent> + <artifactId>druid</artifactId> + <groupId>org.apache.druid</groupId> + <version>27.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <properties> + </properties> + <dependencies> + <dependency> Review Comment: I donot see hadoop 2/ hadoop 3 profiles. For reference you can have a look here : https://github.com/apache/druid/blob/master/extensions-core/hdfs-storage/pom.xml#L142 ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; + +import java.util.ArrayList; +import java.util.List; + +/* + * Druid wrapper for an iceberg catalog. + * The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. + * This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. Review Comment: Where is the `icebergFilter`expression filtering happening. Does the filtering happen while pruning the list of the data files that need to be fetched? ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; + +import java.util.ArrayList; +import java.util.List; + +/* + * Druid wrapper for an iceberg catalog. + * The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. + * This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. + */ + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY) +public abstract class IcebergCatalog +{ + private static final Logger log = new Logger(IcebergCatalog.class); + + public abstract BaseMetastoreCatalog retrieveCatalog(); + + /** + * Extract the iceberg data files upto the latest snapshot associated with the table + * + * @param tableNamespace The catalog namespace under which the table is defined + * @param tableName The iceberg table name + * @return a list of data file paths + */ + public List<String> extractSnapshotDataFiles( + String tableNamespace, + String tableName, + IcebergFilter icebergFilter + ) + { + Catalog catalog = retrieveCatalog(); + Namespace namespace = Namespace.of(tableNamespace); + String tableIdentifier = tableNamespace + "." + tableName; + + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() Review Comment: I think this call needs a special error handling to let the user know that there is some connectivity issue or bad configuration is passed. ########## extensions-contrib/druid-iceberg-extensions/pom.xml: ########## @@ -0,0 +1,314 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one + ~ or more contributor license agreements. See the NOTICE file + ~ distributed with this work for additional information + ~ regarding copyright ownership. The ASF licenses this file + ~ to you under the Apache License, Version 2.0 (the + ~ "License"); you may not use this file except in compliance + ~ with the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, + ~ software distributed under the License is distributed on an + ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + ~ KIND, either express or implied. See the License for the + ~ specific language governing permissions and limitations + ~ under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + + <groupId>org.apache.druid.extensions</groupId> + <artifactId>druid-iceberg-extensions</artifactId> + <name>druid-iceberg-extensions</name> + <description>druid-iceberg-extensions</description> + + + <parent> + <artifactId>druid</artifactId> + <groupId>org.apache.druid</groupId> + <version>27.0.0-SNAPSHOT</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <properties> + </properties> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <version>${hadoop.compile.version}</version> + <exclusions> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty-buffer</artifactId> + </exclusion> + <exclusion> + <groupId>commons-cli</groupId> + <artifactId>commons-cli</artifactId> + </exclusion> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + </exclusion> + <exclusion> + <groupId>commons-logging</groupId> + <artifactId>commons-logging</artifactId> + </exclusion> + <exclusion> + <groupId>commons-io</groupId> + <artifactId>commons-io</artifactId> + </exclusion> + <exclusion> + <groupId>commons-lang</groupId> + <artifactId>commons-lang</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpclient</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.httpcomponents</groupId> + <artifactId>httpcore</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <exclusion> + <groupId>javax.ws.rs</groupId> + <artifactId>jsr311-api</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.code.findbugs</groupId> + <artifactId>jsr305</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.curator</groupId> + <artifactId>curator-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math3</artifactId> + </exclusion> + <exclusion> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + </exclusion> + <exclusion> + <groupId>net.java.dev.jets3t</groupId> + <artifactId>jets3t</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.jcraft</groupId> + <artifactId>jsch</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <!-- Following are excluded to remove security vulnerabilities: --> + <exclusion> + <groupId>commons-beanutils</groupId> + <artifactId>commons-beanutils-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.iceberg</groupId> + <artifactId>iceberg-spark-runtime-3.3_2.12</artifactId> + <version>1.0.0</version> Review Comment: All these versions should go in the main pom ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java: ########## @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonTypeInfo; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.iceberg.filter.IcebergFilter; +import org.apache.druid.java.util.common.IAE; +import org.apache.druid.java.util.common.RE; +import org.apache.druid.java.util.common.logger.Logger; +import org.apache.iceberg.BaseMetastoreCatalog; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.TableScan; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.io.CloseableIterable; + +import java.util.ArrayList; +import java.util.List; + +/* + * Druid wrapper for an iceberg catalog. + * The configured catalog is used to load the specified iceberg table and retrieve the underlying live data files upto the latest snapshot. + * This does not perform any projections on the table yet, therefore all the underlying columns will be retrieved from the data files. + */ + +@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY) +public abstract class IcebergCatalog +{ + private static final Logger log = new Logger(IcebergCatalog.class); + + public abstract BaseMetastoreCatalog retrieveCatalog(); + + /** + * Extract the iceberg data files upto the latest snapshot associated with the table + * + * @param tableNamespace The catalog namespace under which the table is defined + * @param tableName The iceberg table name + * @return a list of data file paths + */ + public List<String> extractSnapshotDataFiles( + String tableNamespace, + String tableName, + IcebergFilter icebergFilter + ) + { + Catalog catalog = retrieveCatalog(); + Namespace namespace = Namespace.of(tableNamespace); + String tableIdentifier = tableNamespace + "." + tableName; + + Thread.currentThread().setContextClassLoader(getClass().getClassLoader()); + TableIdentifier icebergTableIdentifier = catalog.listTables(namespace).stream() + .filter(tableId -> tableId.toString().equals(tableIdentifier)) + .findFirst() + .orElseThrow(() -> new IAE( + " Couldn't retrieve table identifier for '%s'", + tableIdentifier + )); + + long start = System.currentTimeMillis(); + List<String> dataFilePaths = new ArrayList<>(); + try { + TableScan tableScan = catalog.loadTable(icebergTableIdentifier).newScan(); + + if (icebergFilter != null) { + tableScan = icebergFilter.filter(tableScan); + } + + CloseableIterable<FileScanTask> tasks = tableScan.planFiles(); + CloseableIterable.transform(tasks, FileScanTask::file) + .forEach(dataFile -> dataFilePaths.add(dataFile.path().toString())); + + long duration = System.currentTimeMillis() - start; + log.info("Data file scan and fetch took %d ms", duration); Review Comment: You could also log the number of `dataFilePaths` here ########## extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java: ########## @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.druid.iceberg.input; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Preconditions; +import org.apache.druid.data.input.AbstractInputSourceAdapter; +import org.apache.druid.data.input.InputFormat; +import org.apache.druid.data.input.InputRowSchema; +import org.apache.druid.data.input.InputSource; +import org.apache.druid.data.input.InputSourceReader; +import org.apache.druid.data.input.InputSplit; +import org.apache.druid.data.input.SplitHintSpec; +import org.apache.druid.data.input.impl.SplittableInputSource; +import org.apache.druid.iceberg.filter.IcebergFilter; + +import javax.annotation.Nullable; +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.stream.Stream; + +/** + * Inputsource to ingest data managed by the Iceberg table format. + * This inputsource talks to the configured catalog, executes any configured filters and retrieves the data file paths upto the latest snapshot associated with the iceberg table. + * The data file paths are then provided to a native {@link SplittableInputSource} implementation depending on the warehouse source defined. + */ +public class IcebergInputSource implements SplittableInputSource<List<String>> +{ + public static final String TYPE_KEY = "iceberg"; + + @JsonProperty + private final String tableName; + + @JsonProperty + private final String namespace; + + @JsonProperty + private IcebergCatalog icebergCatalog; + + @JsonProperty + private IcebergFilter icebergFilter; + + @JsonProperty + private AbstractInputSourceAdapter warehouseSource; + + private boolean isLoaded = false; + + @JsonCreator + public IcebergInputSource( + @JsonProperty("tableName") String tableName, + @JsonProperty("namespace") String namespace, + @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter, + @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog, + @JsonProperty("warehouseSource") AbstractInputSourceAdapter warehouseSource Review Comment: Can this be another input source here? We can do some validation's here to see if its only local and s3 input source -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
