cryptoe commented on code in PR #14329:
URL: https://github.com/apache/druid/pull/14329#discussion_r1217950267


##########
docs/development/extensions-contrib/iceberg.md:
##########
@@ -0,0 +1,120 @@
+---
+id: iceberg 
+title: "Iceberg"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+## Iceberg Ingest Extension
+
+This extension provides 
[IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) 
which enables ingestion of data stored in the Iceberg table format into Druid.
+
+Apache Iceberg is an open table format for huge analytic datasets. Even though 
iceberg manages most of its metadata on metadata files, it is still dependent 
on a metastore for managing a certain amount of metadata.
+These metastores are defined as Iceberg catalogs and this extension supports 
connecting to the following catalog types:
+* Hive metastore catalog
+* Local catalog
+
+Support for AWS Glue and REST based catalogs are not available yet.
+
+For a given catalog, iceberg table name and filters, The IcebergInputSource 
works by reading the table from the catalog, applying the filters and 
extracting all the underlying live data files up to the latest snapshot.
+The data files are in either Parquet, ORC or Avro formats and all of these 
have InputFormat support in Druid. The data files typically reside in a 
warehouse location which could be in HDFS, S3 or the local filesystem.
+This extension relies on the existing InputSource connectors in Druid to read 
the data files from the warehouse. Therefore, the IcebergInputSource can be 
considered as an intermediate InputSource which provides the file paths for 
other InputSource implementations.
+
+### Load the Iceberg Ingest extension
+
+To use the iceberg extension, add the `druid-iceberg-extensions` to the list 
of loaded extensions. See [Loading 
extensions](../../configuration/extensions.md#loading-extensions) for more 
information.
+
+
+### Hive Metastore catalog
+
+For Druid to seamlessly talk to the Hive Metastore, ensure that the Hive 
specific configuration files such as `hive-site.xml` and `core-site.xml` are 
available in the Druid classpath.  

Review Comment:
   Where are they needed. I am assuming they are only needed on the peon's ?



##########
docs/development/extensions-contrib/iceberg.md:
##########
@@ -0,0 +1,120 @@
+---
+id: iceberg 
+title: "Iceberg"
+---
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+
+## Iceberg Ingest Extension
+
+This extension provides 
[IcebergInputSource](../../ingestion/input-sources.md#iceberg-input-source) 
which enables ingestion of data stored in the Iceberg table format into Druid.
+
+Apache Iceberg is an open table format for huge analytic datasets. Even though 
iceberg manages most of its metadata on metadata files, it is still dependent 
on a metastore for managing a certain amount of metadata.

Review Comment:
   This might need rephrasing. Did you mean a metadata store here ?



##########
extensions-contrib/druid-iceberg-extensions/pom.xml:
##########
@@ -0,0 +1,314 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-iceberg-extensions</artifactId>
+  <name>druid-iceberg-extensions</name>
+  <description>druid-iceberg-extensions</description>
+
+
+  <parent>
+    <artifactId>druid</artifactId>
+    <groupId>org.apache.druid</groupId>
+    <version>27.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <properties>

Review Comment:
   Nit: do we require an empty block here ?



##########
processing/src/main/java/org/apache/druid/data/input/AbstractInputSourceAdapter.java:
##########
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.data.input;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.impl.LocalInputSourceAdapter;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.java.util.common.CloseableIterators;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * A wrapper on top of {@link SplittableInputSource} that handles input source 
creation.

Review Comment:
   I did not understand the intent of this class. More details in the class 
level docs would be helpful. 



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.druid.iceberg.guice.HiveConf;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Hive Metastore specific implementation of iceberg catalog.
+ * Kerberos authentication is performed if the credentials are provided in the 
catalog properties
+ */
+public class HiveIcebergCatalog extends IcebergCatalog
+{
+  public static final String TYPE_KEY = "hive";
+
+  @JsonProperty
+  private String warehousePath;
+
+  @JsonProperty
+  private String catalogUri;
+
+  @JsonProperty
+  private Map<String, String> catalogProperties;
+
+  private final Configuration configuration;
+
+  private BaseMetastoreCatalog hiveCatalog;
+
+  private static final Logger log = new Logger(HiveIcebergCatalog.class);
+
+  @JsonCreator
+  public HiveIcebergCatalog(
+      @JsonProperty("warehousePath") String warehousePath,
+      @JsonProperty("catalogUri") String catalogUri,
+      @JsonProperty("catalogProperties") @Nullable
+          Map<String, String> catalogProperties,
+      @JacksonInject @HiveConf Configuration configuration
+  )
+  {
+    this.warehousePath = Preconditions.checkNotNull(warehousePath, 
"warehousePath cannot be null");
+    this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri 
cannot be null");
+    this.catalogProperties = catalogProperties != null ? catalogProperties : 
new HashMap<>();
+    this.configuration = configuration;
+    this.catalogProperties
+        .forEach(this.configuration::set);
+    this.hiveCatalog = retrieveCatalog();
+  }
+
+  @Override
+  public BaseMetastoreCatalog retrieveCatalog()
+  {
+    if (hiveCatalog == null) {
+      hiveCatalog = setupCatalog();
+    }
+    return hiveCatalog;
+  }
+
+  private HiveCatalog setupCatalog()
+  {
+    HiveCatalog catalog = new HiveCatalog();
+    authenticate();

Review Comment:
   Do we need to handle remote http/rpc related exceptions here ?



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/HiveIcebergCatalog.java:
##########
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JacksonInject;
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
+import org.apache.druid.iceberg.guice.HiveConf;
+import org.apache.druid.java.util.common.ISE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.hive.HiveCatalog;
+
+import javax.annotation.Nullable;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Hive Metastore specific implementation of iceberg catalog.
+ * Kerberos authentication is performed if the credentials are provided in the 
catalog properties
+ */
+public class HiveIcebergCatalog extends IcebergCatalog
+{
+  public static final String TYPE_KEY = "hive";
+
+  @JsonProperty
+  private String warehousePath;
+
+  @JsonProperty
+  private String catalogUri;
+
+  @JsonProperty
+  private Map<String, String> catalogProperties;
+
+  private final Configuration configuration;
+
+  private BaseMetastoreCatalog hiveCatalog;
+
+  private static final Logger log = new Logger(HiveIcebergCatalog.class);
+
+  @JsonCreator
+  public HiveIcebergCatalog(
+      @JsonProperty("warehousePath") String warehousePath,
+      @JsonProperty("catalogUri") String catalogUri,
+      @JsonProperty("catalogProperties") @Nullable
+          Map<String, String> catalogProperties,
+      @JacksonInject @HiveConf Configuration configuration
+  )
+  {
+    this.warehousePath = Preconditions.checkNotNull(warehousePath, 
"warehousePath cannot be null");
+    this.catalogUri = Preconditions.checkNotNull(catalogUri, "catalogUri 
cannot be null");
+    this.catalogProperties = catalogProperties != null ? catalogProperties : 
new HashMap<>();
+    this.configuration = configuration;
+    this.catalogProperties
+        .forEach(this.configuration::set);
+    this.hiveCatalog = retrieveCatalog();
+  }
+
+  @Override
+  public BaseMetastoreCatalog retrieveCatalog()
+  {
+    if (hiveCatalog == null) {
+      hiveCatalog = setupCatalog();
+    }
+    return hiveCatalog;
+  }
+
+  private HiveCatalog setupCatalog()
+  {
+    HiveCatalog catalog = new HiveCatalog();
+    authenticate();
+    catalog.setConf(configuration);
+    catalogProperties.put("warehouse", warehousePath);
+    catalogProperties.put("uri", catalogUri);
+    catalog.initialize("hive", catalogProperties);
+    return catalog;
+  }
+
+  private void authenticate()
+  {
+    String principal = catalogProperties.getOrDefault("principal", null);

Review Comment:
   Are there other types of authentication methods or only we have support for 
krb5 in the initial version. 
   In any case we should document this explicitly. 



##########
extensions-contrib/druid-iceberg-extensions/pom.xml:
##########
@@ -0,0 +1,314 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-iceberg-extensions</artifactId>
+  <name>druid-iceberg-extensions</name>
+  <description>druid-iceberg-extensions</description>
+
+
+  <parent>
+    <artifactId>druid</artifactId>
+    <groupId>org.apache.druid</groupId>
+    <version>27.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <properties>
+  </properties>
+  <dependencies>
+    <dependency>

Review Comment:
   I donot see hadoop 2/ hadoop 3 profiles. For reference you can have a look 
here : 
https://github.com/apache/druid/blob/master/extensions-core/hdfs-storage/pom.xml#L142



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Druid wrapper for an iceberg catalog.
+ * The configured catalog is used to load the specified iceberg table and 
retrieve the underlying live data files upto the latest snapshot.
+ * This does not perform any projections on the table yet, therefore all the 
underlying columns will be retrieved from the data files.

Review Comment:
   Where is the `icebergFilter`expression filtering happening. 
   Does the filtering happen while pruning the list of the data files that need 
to be fetched?



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Druid wrapper for an iceberg catalog.
+ * The configured catalog is used to load the specified iceberg table and 
retrieve the underlying live data files upto the latest snapshot.
+ * This does not perform any projections on the table yet, therefore all the 
underlying columns will be retrieved from the data files.
+ */
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY)
+public abstract class IcebergCatalog
+{
+  private static final Logger log = new Logger(IcebergCatalog.class);
+
+  public abstract BaseMetastoreCatalog retrieveCatalog();
+
+  /**
+   * Extract the iceberg data files upto the latest snapshot associated with 
the table
+   *
+   * @param tableNamespace The catalog namespace under which the table is 
defined
+   * @param tableName      The iceberg table name
+   * @return a list of data file paths
+   */
+  public List<String> extractSnapshotDataFiles(
+      String tableNamespace,
+      String tableName,
+      IcebergFilter icebergFilter
+  )
+  {
+    Catalog catalog = retrieveCatalog();
+    Namespace namespace = Namespace.of(tableNamespace);
+    String tableIdentifier = tableNamespace + "." + tableName;
+
+    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+    TableIdentifier icebergTableIdentifier = 
catalog.listTables(namespace).stream()

Review Comment:
   I think this call needs a special error handling to let the user know that 
there is some connectivity issue or bad configuration is passed. 



##########
extensions-contrib/druid-iceberg-extensions/pom.xml:
##########
@@ -0,0 +1,314 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~   http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing,
+  ~ software distributed under the License is distributed on an
+  ~ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+  ~ KIND, either express or implied.  See the License for the
+  ~ specific language governing permissions and limitations
+  ~ under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+  <groupId>org.apache.druid.extensions</groupId>
+  <artifactId>druid-iceberg-extensions</artifactId>
+  <name>druid-iceberg-extensions</name>
+  <description>druid-iceberg-extensions</description>
+
+
+  <parent>
+    <artifactId>druid</artifactId>
+    <groupId>org.apache.druid</groupId>
+    <version>27.0.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+  <modelVersion>4.0.0</modelVersion>
+
+  <properties>
+  </properties>
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-common</artifactId>
+      <version>${hadoop.compile.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.netty</groupId>
+          <artifactId>netty-buffer</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-cli</groupId>
+          <artifactId>commons-cli</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>log4j</groupId>
+          <artifactId>log4j</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-codec</groupId>
+          <artifactId>commons-codec</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-logging</groupId>
+          <artifactId>commons-logging</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-io</groupId>
+          <artifactId>commons-io</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>commons-lang</groupId>
+          <artifactId>commons-lang</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpclient</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.httpcomponents</groupId>
+          <artifactId>httpcore</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.zookeeper</groupId>
+          <artifactId>zookeeper</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.slf4j</groupId>
+          <artifactId>slf4j-log4j12</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>javax.ws.rs</groupId>
+          <artifactId>jsr311-api</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.code.findbugs</groupId>
+          <artifactId>jsr305</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty-util</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.protobuf</groupId>
+          <artifactId>protobuf-java</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-core</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.curator</groupId>
+          <artifactId>curator-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.commons</groupId>
+          <artifactId>commons-math3</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.google.guava</groupId>
+          <artifactId>guava</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.avro</groupId>
+          <artifactId>avro</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>net.java.dev.jets3t</groupId>
+          <artifactId>jets3t</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-json</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.jcraft</groupId>
+          <artifactId>jsch</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.mortbay.jetty</groupId>
+          <artifactId>jetty</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>com.sun.jersey</groupId>
+          <artifactId>jersey-server</artifactId>
+        </exclusion>
+        <!-- Following are excluded to remove security vulnerabilities: -->
+        <exclusion>
+          <groupId>commons-beanutils</groupId>
+          <artifactId>commons-beanutils-core</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-spark-runtime-3.3_2.12</artifactId>
+      <version>1.0.0</version>

Review Comment:
   All these versions should go in the main pom 



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergCatalog.java:
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+import org.apache.druid.java.util.common.IAE;
+import org.apache.druid.java.util.common.RE;
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.iceberg.BaseMetastoreCatalog;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.io.CloseableIterable;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/*
+ * Druid wrapper for an iceberg catalog.
+ * The configured catalog is used to load the specified iceberg table and 
retrieve the underlying live data files upto the latest snapshot.
+ * This does not perform any projections on the table yet, therefore all the 
underlying columns will be retrieved from the data files.
+ */
+
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = InputFormat.TYPE_PROPERTY)
+public abstract class IcebergCatalog
+{
+  private static final Logger log = new Logger(IcebergCatalog.class);
+
+  public abstract BaseMetastoreCatalog retrieveCatalog();
+
+  /**
+   * Extract the iceberg data files upto the latest snapshot associated with 
the table
+   *
+   * @param tableNamespace The catalog namespace under which the table is 
defined
+   * @param tableName      The iceberg table name
+   * @return a list of data file paths
+   */
+  public List<String> extractSnapshotDataFiles(
+      String tableNamespace,
+      String tableName,
+      IcebergFilter icebergFilter
+  )
+  {
+    Catalog catalog = retrieveCatalog();
+    Namespace namespace = Namespace.of(tableNamespace);
+    String tableIdentifier = tableNamespace + "." + tableName;
+
+    Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
+    TableIdentifier icebergTableIdentifier = 
catalog.listTables(namespace).stream()
+                                                    .filter(tableId -> 
tableId.toString().equals(tableIdentifier))
+                                                    .findFirst()
+                                                    .orElseThrow(() -> new IAE(
+                                                        " Couldn't retrieve 
table identifier for '%s'",
+                                                        tableIdentifier
+                                                    ));
+
+    long start = System.currentTimeMillis();
+    List<String> dataFilePaths = new ArrayList<>();
+    try {
+      TableScan tableScan = 
catalog.loadTable(icebergTableIdentifier).newScan();
+
+      if (icebergFilter != null) {
+        tableScan = icebergFilter.filter(tableScan);
+      }
+
+      CloseableIterable<FileScanTask> tasks = tableScan.planFiles();
+      CloseableIterable.transform(tasks, FileScanTask::file)
+                       .forEach(dataFile -> 
dataFilePaths.add(dataFile.path().toString()));
+
+      long duration = System.currentTimeMillis() - start;
+      log.info("Data file scan and fetch took %d ms", duration);

Review Comment:
   You could also log the number of `dataFilePaths` here



##########
extensions-contrib/druid-iceberg-extensions/src/main/java/org/apache/druid/iceberg/input/IcebergInputSource.java:
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.iceberg.input;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.data.input.AbstractInputSourceAdapter;
+import org.apache.druid.data.input.InputFormat;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.InputSource;
+import org.apache.druid.data.input.InputSourceReader;
+import org.apache.druid.data.input.InputSplit;
+import org.apache.druid.data.input.SplitHintSpec;
+import org.apache.druid.data.input.impl.SplittableInputSource;
+import org.apache.druid.iceberg.filter.IcebergFilter;
+
+import javax.annotation.Nullable;
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * Inputsource to ingest data managed by the Iceberg table format.
+ * This inputsource talks to the configured catalog, executes any configured 
filters and retrieves the data file paths upto the latest snapshot associated 
with the iceberg table.
+ * The data file paths are then provided to a native {@link 
SplittableInputSource} implementation depending on the warehouse source defined.
+ */
+public class IcebergInputSource implements SplittableInputSource<List<String>>
+{
+  public static final String TYPE_KEY = "iceberg";
+
+  @JsonProperty
+  private final String tableName;
+
+  @JsonProperty
+  private final String namespace;
+
+  @JsonProperty
+  private IcebergCatalog icebergCatalog;
+
+  @JsonProperty
+  private IcebergFilter icebergFilter;
+
+  @JsonProperty
+  private AbstractInputSourceAdapter warehouseSource;
+
+  private boolean isLoaded = false;
+
+  @JsonCreator
+  public IcebergInputSource(
+      @JsonProperty("tableName") String tableName,
+      @JsonProperty("namespace") String namespace,
+      @JsonProperty("icebergFilter") @Nullable IcebergFilter icebergFilter,
+      @JsonProperty("icebergCatalog") IcebergCatalog icebergCatalog,
+      @JsonProperty("warehouseSource") AbstractInputSourceAdapter 
warehouseSource

Review Comment:
   Can this be another input source here?
   We can do some validation's here to see if its only local and s3 input source



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to