This is an automated email from the ASF dual-hosted git repository.

htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 6519550a89 [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.2
6519550a89 is described below

commit 6519550a89fc7804a3e0f96982981288b18458a8
Author: Hussain Towaileb <[email protected]>
AuthorDate: Sun Sep 21 16:46:35 2025 +0300

    [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.2
    
    Details:
    - Supported AWS Glue catalog.
    - Supported read path AWS Glue -> Parquet pipeline
    - Tested types:
      - boolean
      - integer
      - long
      - float
      - double
      - string
      - uuid
      - decimal (to double)
    - Supported projection
    - Skeleton for supporting partition pruning and filtration
      in place
    - Custom AWS client providers to support all auth methods
    
    Ext-ref: MB-63115
    
    Change-Id: I531c613fe061367b2cb5c187bc985b39d6de17d1
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20478
    Reviewed-by: Hussain Towaileb <[email protected]>
    Tested-by: Hussain Towaileb <[email protected]>
---
 .../rules/PushValueAccessAndFilterDownRule.java    |   2 +
 .../IcebergTableFilterPushdownProcessor.java       |  64 +++++
 .../asterix/app/translator/QueryTranslator.java    |  32 ++-
 .../handlers/CatalogStatementHandler.java          |   4 +-
 .../api/common/LocalCloudUtilAdobeMock.java        |   9 -
 .../catalog/all-supported-types/test.000.ddl.sqlpp |   4 +-
 .../asterix/common/exceptions/ErrorCode.java       |   3 +
 .../asterix/common/metadata/IMetadataLockUtil.java |   2 +-
 .../src/main/resources/asx_errormsg/en.properties  |   3 +
 asterixdb/asterix-external-data/pom.xml            |  21 ++
 .../adapter/factory/GenericAdapterFactory.java     |  13 +
 .../IIcebergRecordReaderFactory.java}              |  14 +-
 .../awsclient/EnsureCloseAWSClientFactory.java     | 132 ++++++++++
 .../filter/IcebergTableFilterEvaluatorFactory.java |  52 ++++
 .../NoOpIcebergTableFilterEvaluatorFactory.java    |  46 ++++
 .../aws/iceberg/IcebergFileRecordReader.java       | 177 +++++++++++++
 .../iceberg/IcebergParquetRecordReaderFactory.java | 212 +++++++++++++++
 .../iceberg/converter/IcebergConverterContext.java | 107 ++++++++
 .../external/parser/IcebergParquetDataParser.java  | 292 +++++++++++++++++++++
 .../factory/IcebergParserFactory.java}             |  19 +-
 .../IcebergTableParquetDataParserFactory.java      |  64 +++++
 .../provider/DatasourceFactoryProvider.java        |  19 +-
 .../external/provider/ParserFactoryProvider.java   |   9 +-
 .../external/util/ExternalDataConstants.java       |  11 +
 .../asterix/external/util/ExternalDataUtils.java   |  37 ++-
 .../apache/asterix/external/util/HDFSUtils.java    |  23 +-
 .../asterix/external/util/aws/glue/GlueUtils.java  |  48 ++++
 .../asterix/external/util/aws/s3/S3Utils.java      |   9 +-
 .../google/biglake_metastore/BiglakeMetastore.java |  77 ++++++
 .../external/util/iceberg/IcebergConstants.java    |  15 +-
 .../external/util/iceberg/IcebergUtils.java        | 114 ++++++++
 ....apache.asterix.external.api.IDataParserFactory |   3 +-
 ...pache.asterix.external.api.IRecordReaderFactory |   3 +-
 .../metadata/declared/MetadataProvider.java        |  22 ++
 .../apache/asterix/metadata/utils/DatasetUtil.java |   6 +
 .../apache/asterix/metadata/utils/IndexUtil.java   | 100 ++++---
 .../asterix/metadata/utils/MetadataLockUtil.java   |   9 +-
 asterixdb/pom.xml                                  |  58 ++--
 38 files changed, 1705 insertions(+), 130 deletions(-)

diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index bd9e3293a5..ffc6febd23 100644
--- 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -34,6 +34,7 @@ import 
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPu
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
+import 
org.apache.asterix.optimizer.rules.pushdown.processor.IcebergTableFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
 import 
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -121,6 +122,7 @@ public class PushValueAccessAndFilterDownRule implements 
IAlgebraicRewriteRule {
         // Performs prefix pushdowns
         pushdownProcessorsExecutor.add(new 
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor.add(new 
DeltaTableFilterPushdownProcessor(pushdownContext, context));
+        pushdownProcessorsExecutor.add(new 
IcebergTableFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor.add(new 
ParquetFilterPushdownProcessor(pushdownContext, context));
         pushdownProcessorsExecutor
                 .add(new 
ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
diff --git 
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
new file mode 100644
index 0000000000..03477c5ab0
--- /dev/null
+++ 
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import 
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import 
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import 
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class IcebergTableFilterPushdownProcessor extends 
ColumnFilterPushdownProcessor {
+
+    public IcebergTableFilterPushdownProcessor(PushdownContext 
pushdownContext, IOptimizationContext context) {
+        super(pushdownContext, context);
+    }
+
+    @Override
+    protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws 
AlgebricksException {
+        return !DatasetUtil.isIcebergTable(scanDefineDescriptor.getDataset());
+    }
+
+    @Override
+    protected boolean isNotPushable(AbstractFunctionCallExpression expression) 
{
+        FunctionIdentifier fid = expression.getFunctionIdentifier();
+        return ARRAY_FUNCTIONS.contains(fid) || 
super.isNotPushable(expression);
+    }
+
+    @Override
+    protected FilterBranch handlePath(AbstractFunctionCallExpression 
expression, IExpectedSchemaNode node)
+            throws AlgebricksException {
+        if (node.getType() != ExpectedSchemaNodeType.ANY) {
+            return FilterBranch.NA;
+        }
+
+        // The inferred path from the provided expression
+        ARecordType expressionPath = 
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+        paths.put(expression, expressionPath);
+        return FilterBranch.FILTER_PATH;
+    }
+}
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index bf91785ec4..0946e4a4a3 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -120,6 +120,8 @@ import 
org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.WriterValidationUtil;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import 
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
 import org.apache.asterix.lang.common.base.Expression;
 import org.apache.asterix.lang.common.base.IQueryRewriter;
@@ -190,6 +192,7 @@ import org.apache.asterix.lang.common.struct.Identifier;
 import org.apache.asterix.lang.common.struct.VarIdentifier;
 import org.apache.asterix.lang.common.util.FunctionUtil;
 import org.apache.asterix.lang.common.util.LangDatasetUtil;
+import org.apache.asterix.lang.common.util.LangRecordParseUtil;
 import org.apache.asterix.lang.common.util.ViewUtil;
 import org.apache.asterix.lang.sqlpp.rewrites.SqlppQueryRewriter;
 import org.apache.asterix.metadata.IDatasetDetails;
@@ -200,6 +203,7 @@ import 
org.apache.asterix.metadata.dataset.DatasetFormatInfo;
 import org.apache.asterix.metadata.dataset.hints.DatasetHints;
 import 
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
 import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Catalog;
 import org.apache.asterix.metadata.entities.CompactionPolicy;
 import org.apache.asterix.metadata.entities.Database;
 import org.apache.asterix.metadata.entities.Dataset;
@@ -877,10 +881,19 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
             return;
         }
 
+        String catalogName = null;
+        if (dd.getDatasetType().equals(DatasetType.EXTERNAL)) {
+            ExternalDetailsDecl edd = (ExternalDetailsDecl) 
dd.getDatasetDetailsDecl();
+            Map<String, String> properties = new 
HashMap<>(edd.getProperties());
+            LangRecordParseUtil.recordToMap(properties, ((DatasetDecl) 
stmt).getWithObjectNode());
+            catalogName = 
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+        }
+
         lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), 
databaseName, dataverseName, datasetName,
                 itemTypeDatabase, itemTypeDataverseName, itemTypeName, 
itemTypeAnonymous, metaItemTypeDatabase,
                 metaItemTypeDataverseName, metaItemTypeName, 
metaItemTypeAnonymous, nodegroupName, compactionPolicy,
-                defaultCompactionPolicy, dd.getDatasetType(), 
dd.getDatasetDetailsDecl(), metadataProvider);
+                defaultCompactionPolicy, dd.getDatasetType(), 
dd.getDatasetDetailsDecl(), metadataProvider,
+                catalogName);
         try {
             doCreateDatasetStatement(metadataProvider, dd, 
stmtActiveNamespace, datasetName, itemTypeNamespace,
                     itemTypeExpr, itemTypeName, metaItemTypeExpr, 
metaItemTypeNamespace, metaItemTypeName, hcc,
@@ -1061,6 +1074,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
                     ExternalDataUtils.normalize(properties);
                     ExternalDataUtils.validate(properties);
                     ExternalDataUtils.validateType(properties, (ARecordType) 
itemType);
+                    validateIfIcebergTable(properties, mdTxnCtx, sourceLoc);
                     validateExternalDatasetProperties(externalDetails, 
properties, dd.getSourceLocation(), mdTxnCtx,
                             appCtx, metadataProvider);
                     datasetDetails = new 
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
@@ -1173,6 +1187,20 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         return Optional.of(dataset);
     }
 
+    private void validateIfIcebergTable(Map<String, String> properties, 
MetadataTransactionContext mdTxnCtx,
+            SourceLocation srcLoc) throws AlgebricksException {
+        if (!IcebergUtils.isIcebergTable(properties)) {
+            return;
+        }
+
+        // ensure the specified catalog exists
+        String catalogName = 
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+        Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx, 
catalogName);
+        if (catalog == null) {
+            throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc, 
catalogName);
+        }
+    }
+
     protected boolean isDatasetWithoutTypeSpec(DatasetDecl datasetDecl, 
ARecordType aRecordType,
             ARecordType metaRecType) {
         return aRecordType.getFieldNames().length == 0 && metaRecType == null;
@@ -2927,7 +2955,7 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         }
         lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(), 
databaseName, dataverseName, viewName,
                 itemTypeDatabaseName, viewItemTypeDataverseName, 
viewItemTypeName, viewItemTypeAnonymous, null, null,
-                null, false, null, null, true, DatasetType.VIEW, null, 
metadataProvider);
+                null, false, null, null, true, DatasetType.VIEW, null, 
metadataProvider, null);
         try {
             doCreateView(metadataProvider, cvs, databaseName, dataverseName, 
viewName, itemTypeDatabaseName,
                     viewItemTypeDataverseName, viewItemTypeName, stmtRewriter, 
requestParameters, creator);
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
index bba267c53d..8c33407e44 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
@@ -82,7 +82,7 @@ public class CatalogStatementHandler {
                 handleDrop();
                 return;
             default:
-                throw new IllegalStateException("Catalog handler handling 
non-catalog statements");
+                throw new IllegalStateException("Catalog statement handler 
handling non-catalog statement: " + kind);
         }
     }
 
@@ -250,7 +250,7 @@ public class CatalogStatementHandler {
     protected void validateIcebergCatalogProperties(CatalogCreateStatement 
statement,
             MetadataTransactionContext mdTxnCtx, MetadataProvider 
metadataProvider) throws AlgebricksException {
         IcebergCatalogCreateStatement icebergStatement = 
(IcebergCatalogCreateStatement) statement;
-        IcebergCatalogDetailsDecl details = (IcebergCatalogDetailsDecl) 
icebergStatement.getCatalogDetailsDecl();
+        IcebergCatalogDetailsDecl details = 
icebergStatement.getCatalogDetailsDecl();
         Map<String, String> allProperties = new 
HashMap<>(details.getProperties());
         LangRecordParseUtil.recordToMap(allProperties, 
icebergStatement.getWithObjectNode());
         IcebergUtils.validateCatalogProperties(allProperties);
diff --git 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
index 8509afff0f..be83b09e43 100644
--- 
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
+++ 
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
@@ -140,13 +140,4 @@ public class LocalCloudUtilAdobeMock {
             s3Mock = null;
         }
     }
-
-    public static S3Client getS3Client() {
-        S3ClientBuilder builder = S3Client.builder();
-        URI endpoint = URI.create(s3Mock.getHttpEndpoint());
-        
builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
-                .endpointOverride(endpoint);
-        S3Client client = builder.build();
-        return client;
-    }
 }
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
index c20f27d9ef..657e2676ae 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
@@ -32,7 +32,7 @@ CREATE CATALOG myRestCatalog
 TYPE Iceberg
 SOURCE REST
 WITH {
-    "warehouse": "s3://cbas-iceberg-playground/my-glue-catalog/warehouse/",
+    "warehouse": "s3://cbas-iceberg-playground/my-rest-catalog/warehouse/",
     "namespace": "pharmacy_namespace",
     "accessKeyId" : "myAccessKeyId",
     "secretAccessKey" : "mySecretAccessKey",
@@ -43,7 +43,7 @@ CREATE CATALOG myBiglakeMetastoreCatalog
 TYPE Iceberg
 SOURCE BIGLAKE_METASTORE
 WITH {
-    "warehouse": "s3://cbas-iceberg-playground/my-glue-catalog/warehouse/",
+    "warehouse": 
"s3://cbas-iceberg-playground/my-biglake-metastore-catalog/warehouse/",
     "namespace": "pharmacy_namespace",
     "accessKeyId" : "myAccessKeyId",
     "secretAccessKey" : "mySecretAccessKey",
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 6a3df32f8c..29aa4a56fc 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -340,6 +340,9 @@ public enum ErrorCode implements IError {
     UPDATE_ATTEMPT_ON_CONFLICTING_PATHS(1232),
     UPDATE_SET_EXPR_MISSING_ON_ROOT_PATH(1233),
     UPDATE_TARGET_NOT_LIST(1234),
+    ICEBERG_NAMESPACE_DOES_NOT_EXIST(1235),
+    ICEBERG_TABLE_DOES_NOT_EXIST(1236),
+    UNSUPPORTED_ICEBERG_DATA_FORMAT(1237),
 
     // Feed errors
     DATAFLOW_ILLEGAL_STATE(3001),
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
index ecd78724e9..07a1442d1b 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
@@ -52,7 +52,7 @@ public interface IMetadataLockUtil {
             String metaItemTypeDatabase, DataverseName 
metaItemTypeDataverseName, String metaItemTypeName,
             boolean metaItemTypeAnonymous, String nodeGroupName, String 
compactionPolicyName,
             boolean isDefaultCompactionPolicy, DatasetConfig.DatasetType 
datasetType, Object datasetDetails,
-            IMetadataProvider metadataProvider) throws AlgebricksException;
+            IMetadataProvider metadataProvider, String catalogName) throws 
AlgebricksException;
 
     void dropDatasetBegin(IMetadataLockManager lockManager, LockList locks, 
String database,
             DataverseName dataverseName, String datasetName) throws 
AlgebricksException;
diff --git 
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties 
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 5a2f5439fe..607eb0b576 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -342,6 +342,9 @@
 1232 = Cannot update both parent and child fields in the same UPDATE statement
 1233 = UPDATE statement cannot set entire record to null. Use DELETE statement 
to remove records
 1234 = Invalid update target, expected a list, but found: %s
+1235 = The provided Iceberg namespace '%1$s' does not exist
+1236 = The provided Iceberg table '%1$s' does not exist
+1237 = The provided data format '%1$s' is not supported for Iceberg tables
 
 # Feed Errors
 3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml 
b/asterixdb/asterix-external-data/pom.xml
index 5105e19346..c9ff38d2a4 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -479,6 +479,14 @@
       <groupId>software.amazon.awssdk</groupId>
       <artifactId>apache-client</artifactId>
     </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>dynamodb</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>software.amazon.awssdk</groupId>
+      <artifactId>kms</artifactId>
+    </dependency>
     <dependency>
       <groupId>com.azure</groupId>
       <artifactId>azure-storage-blob</artifactId>
@@ -628,6 +636,19 @@
       <artifactId>hyracks-cloud</artifactId>
       <version>0.3.10-SNAPSHOT</version>
     </dependency>
+    <!-- Iceberg -->
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-aws</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.iceberg</groupId>
+      <artifactId>iceberg-parquet</artifactId>
+    </dependency>
   </dependencies>
   <!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
   <repositories>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index ecc7833e5d..078c7ca49f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -31,11 +31,13 @@ import org.apache.asterix.common.metadata.DataverseName;
 import org.apache.asterix.external.api.IDataFlowController;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIcebergRecordReaderFactory;
 import org.apache.asterix.external.api.ITypedAdapterFactory;
 import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
 import org.apache.asterix.external.dataset.adapter.FeedAdapter;
 import org.apache.asterix.external.dataset.adapter.GenericAdapter;
 import 
org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.parser.factory.IcebergParserFactory;
 import org.apache.asterix.external.provider.DataflowControllerProvider;
 import org.apache.asterix.external.provider.DatasourceFactoryProvider;
 import org.apache.asterix.external.provider.ParserFactoryProvider;
@@ -46,6 +48,7 @@ import org.apache.asterix.external.util.FeedLogManager;
 import org.apache.asterix.external.util.FeedUtils;
 import org.apache.asterix.external.util.IFeedLogManager;
 import org.apache.asterix.external.util.NoOpFeedLogManager;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.asterix.om.types.ARecordType;
 import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -144,6 +147,7 @@ public class GenericAdapterFactory implements 
ITypedAdapterFactory {
         dataSourceFactory.configure(serviceContext, configuration, 
warningCollector, filterEvaluatorFactory);
         ExternalDataUtils.validateDataParserParameters(configuration);
         dataParserFactory = createDataParserFactory(configuration);
+        setProjectedSchemaIfIcebergTable(configuration, dataSourceFactory, 
dataParserFactory);
         dataParserFactory.setRecordType(recordType);
         dataParserFactory.setMetaType(metaType);
         dataParserFactory.configure(configuration);
@@ -152,6 +156,15 @@ public class GenericAdapterFactory implements 
ITypedAdapterFactory {
         nullifyExternalObjects();
     }
 
+    private void setProjectedSchemaIfIcebergTable(Map<String, String> 
configuration,
+            IExternalDataSourceFactory dataSourceFactory, IDataParserFactory 
dataParserFactory) {
+        if (IcebergUtils.isIcebergTable(configuration)) {
+            IIcebergRecordReaderFactory readerFactory = 
(IIcebergRecordReaderFactory) dataSourceFactory;
+            IcebergParserFactory parserFactory = (IcebergParserFactory) 
dataParserFactory;
+            
parserFactory.setProjectedSchema(readerFactory.getProjectedSchema());
+        }
+    }
+
     private void configureFeedLogManager(ICcApplicationContext appCtx)
             throws HyracksDataException, AlgebricksException {
         this.isFeed = ExternalDataUtils.isFeed(configuration);
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
similarity index 63%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
index c5b5da3f36..6d1b1c2754 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
@@ -16,15 +16,11 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.util.iceberg;
+package org.apache.asterix.external.api;
 
-public class IcebergConstants {
-    private IcebergConstants() {
-        throw new AssertionError("do not instantiate");
-    }
+import org.apache.iceberg.Schema;
 
-    public static final String ICEBERG_TABLE_FORMAT = "iceberg";
-    public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
-    public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
-    public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+public interface IIcebergRecordReaderFactory<T> extends 
IRecordReaderFactory<T> {
+
+    Schema getProjectedSchema();
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
new file mode 100644
index 0000000000..5f465b6fca
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.awsclient;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.aws.glue.GlueUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.iceberg.aws.AwsClientFactory;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * Custom AWS client factory that ensures assume-role resources (STS client + 
credentials provider)
+ * are closed when the Glue catalog is closed. Achieved by returning a proxy 
GlueClient with a
+ * close() method that performs the extra cleanup.
+ */
+public class EnsureCloseAWSClientFactory implements AwsClientFactory {
+    private static final long serialVersionUID = 1L;
+
+    private Map<String, String> properties;
+    private CloseableAwsClients s3Clients;
+    private CloseableAwsClients glueClients;
+    private GlueClient proxyGlueClient; // cached proxy
+
+    @Override
+    public void initialize(Map<String, String> properties) {
+        this.properties = properties;
+    }
+
+    @Override
+    public S3Client s3() {
+        ensureS3Clients();
+        return (S3Client) s3Clients.getConsumingClient();
+    }
+
+    @Override
+    public S3AsyncClient s3Async() {
+        return null;
+    }
+
+    @Override
+    public GlueClient glue() {
+        ensureGlueClients();
+        return (GlueClient) glueClients.getConsumingClient();
+    }
+
+    @Override
+    public KmsClient kms() {
+        return null;
+    }
+
+    @Override
+    public software.amazon.awssdk.services.dynamodb.DynamoDbClient dynamo() {
+        return null;
+    }
+
+    private void ensureS3Clients() {
+        if (s3Clients == null) {
+            try {
+                s3Clients = S3Utils.buildClient(null, properties);
+            } catch (CompilationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    private void ensureGlueClients() {
+        if (glueClients == null) {
+            try {
+                glueClients = GlueUtils.buildClient(null, properties);
+            } catch (CompilationException e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    /**
+     * Invocation handler that intercepts close() to also close STS + 
credentials provider.
+     */
+    private class GlueClientInvocationHandler implements InvocationHandler {
+        private final GlueClient delegate;
+        private volatile boolean closed = false;
+
+        GlueClientInvocationHandler(GlueClient delegate) {
+            this.delegate = delegate;
+        }
+
+        @Override
+        public Object invoke(Object proxy, Method method, Object[] args) 
throws Throwable {
+            String name = method.getName();
+            if ("close".equals(name)) {
+                if (!closed) {
+                    closed = true;
+                    try {
+                        delegate.close();
+                    } finally {
+                        // Close both sets; no-op if not built or already 
closed
+                        AwsUtils.closeClients(glueClients);
+                        AwsUtils.closeClients(s3Clients);
+                    }
+                }
+                return null;
+            }
+            return method.invoke(delegate, args);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..da40847b4e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.iceberg.expressions.Expression;
+
+public class IcebergTableFilterEvaluatorFactory implements 
IExternalFilterEvaluatorFactory {
+    private static final long serialVersionUID = 1L;
+    private final Expression filterExpression;
+
+    public IcebergTableFilterEvaluatorFactory(Expression expression) {
+        this.filterExpression = expression;
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, 
IWarningCollector warningCollector)
+            throws HyracksDataException {
+        return null;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector 
warningCollector) {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+    public Expression getFilterExpression() {
+        return filterExpression;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..0a18c2ac9f
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpIcebergTableFilterEvaluatorFactory extends 
IcebergTableFilterEvaluatorFactory {
+    public static final IExternalFilterEvaluatorFactory INSTANCE = new 
NoOpIcebergTableFilterEvaluatorFactory();
+    private static final long serialVersionUID = 1L;
+
+    private NoOpIcebergTableFilterEvaluatorFactory() {
+        super(null);
+    }
+
+    @Override
+    public IExternalFilterEvaluator create(IServiceContext serviceContext, 
IWarningCollector warningCollector) {
+        return NoOpExternalFilterEvaluator.INSTANCE;
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector 
warningCollector) {
+        return NoOpFilterValueEmbedder.INSTANCE;
+    }
+
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
new file mode 100644
index 0000000000..9f779b8c7e
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+/**
+ * Iceberg record reader.
+ * The reader returns records in Iceberg Record format.
+ */
+public class IcebergFileRecordReader implements IRecordReader<Record> {
+
+    private final List<FileScanTask> fileScanTasks;
+    private final Schema projectedSchema;
+    private final Map<String, String> configuration;
+    private final IRawRecord<Record> record;
+
+    private int nextTaskIndex = 0;
+    private Catalog catalog;
+    private FileIO tableFileIo;
+    private CloseableIterable<Record> iterable;
+    private Iterator<Record> recordsIterator;
+
+    public IcebergFileRecordReader(List<FileScanTask> fileScanTasks, Schema 
projectedSchema,
+            Map<String, String> configuration) throws HyracksDataException {
+        this.fileScanTasks = fileScanTasks;
+        this.projectedSchema = projectedSchema;
+        this.configuration = configuration;
+        this.record = new GenericRecord<>();
+
+        try {
+            initializeTable();
+        } catch (CompilationException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    private void initializeTable() throws CompilationException {
+        if (fileScanTasks.isEmpty()) {
+            return;
+        }
+
+        String namespace = IcebergUtils.getNamespace(configuration);
+        String tableName = 
configuration.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+        catalog = 
IcebergUtils.initializeCatalog(IcebergUtils.filterCatalogProperties(configuration),
 namespace);
+        TableIdentifier tableIdentifier = 
TableIdentifier.of(Namespace.of(namespace), tableName);
+        if (!catalog.tableExists(tableIdentifier)) {
+            throw 
CompilationException.create(ErrorCode.ICEBERG_TABLE_DOES_NOT_EXIST, tableName);
+        }
+        Table table = catalog.loadTable(tableIdentifier);
+        tableFileIo = table.io();
+    }
+
+    public boolean hasNext() throws Exception {
+        // iterator has more records
+        if (recordsIterator != null && recordsIterator.hasNext()) {
+            return true;
+        }
+
+        // go to next task
+        // if a file is empty, we will go to the next task
+        while (nextTaskIndex < fileScanTasks.size()) {
+
+            // close previous iterable
+            if (iterable != null) {
+                iterable.close();
+                iterable = null;
+            }
+
+            // Load next task
+            setNextRecordsIterator();
+
+            // if the new iterator has rows → good
+            if (recordsIterator != null && recordsIterator.hasNext()) {
+                return true;
+            }
+
+            // else: this task is empty → continue the loop to the next task
+        }
+
+        // no more tasks & no more rows
+        return false;
+    }
+
+    @Override
+    public IRawRecord<Record> next() throws IOException, InterruptedException {
+        Record icebergRecord = recordsIterator.next();
+        record.set(icebergRecord);
+        return record;
+    }
+
+    @Override
+    public boolean stop() {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tableFileIo != null) {
+            tableFileIo.close();
+        }
+        if (iterable != null) {
+            iterable.close();
+        }
+
+        try {
+            IcebergUtils.closeCatalog(catalog);
+        } catch (CompilationException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public void setController(AbstractFeedDataFlowController controller) {
+
+    }
+
+    @Override
+    public void setFeedLogManager(IFeedLogManager feedLogManager) throws 
HyracksDataException {
+
+    }
+
+    @Override
+    public boolean handleException(Throwable th) {
+        return false;
+    }
+
+    private void setNextRecordsIterator() {
+        FileScanTask task = fileScanTasks.get(nextTaskIndex++);
+        InputFile inFile = tableFileIo.newInputFile(task.file().location());
+        iterable = 
Parquet.read(inFile).project(projectedSchema).split(task.start(), task.length())
+                .createReaderFunc(fs -> 
GenericParquetReaders.buildReader(projectedSchema, fs)).build();
+        recordsIterator = iterable.iterator();
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
new file mode 100644
index 0000000000..f9af1d153f
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg;
+
+import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields;
+import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.setSnapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IIcebergRecordReaderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import 
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+public class IcebergParquetRecordReaderFactory implements 
IIcebergRecordReaderFactory<Record> {
+
+    private static final long serialVersionUID = 1L;
+    private static final List<String> RECORD_READER_NAMES =
+            
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+
+    private final List<FileScanTask> fileScanTasks = new ArrayList<>();
+    private final List<PartitionWorkLoadBasedOnSize> 
partitionWorkLoadsBasedOnSize = new ArrayList<>();
+
+    private Schema projectedSchema;
+    private Map<String, String> configurationCopy;
+
+    private transient AlgebricksAbsolutePartitionConstraint 
partitionConstraint;
+
+    public IcebergParquetRecordReaderFactory() {
+    }
+
+    @Override
+    public Class<?> getRecordClass() throws AsterixException {
+        return Record.class;
+    }
+
+    @Override
+    public List<String> getRecordReaderNames() {
+        return RECORD_READER_NAMES;
+    }
+
+    @Override
+    public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+        return partitionConstraint;
+    }
+
+    private int getPartitionsCount() {
+        return getPartitionConstraint().getLocations().length;
+    }
+
+    @Override
+    public IRecordReader<Record> 
createRecordReader(IExternalDataRuntimeContext context) throws 
HyracksDataException {
+        try {
+            int partition = context.getPartition();
+            return new 
IcebergFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getFileScanTasks(),
+                    projectedSchema, new HashMap<>(configurationCopy));
+        } catch (Exception e) {
+            throw HyracksDataException.create(e);
+        }
+    }
+
+    @Override
+    public Set<String> getReaderSupportedFormats() {
+        return Collections.singleton(IcebergConstants.ICEBERG_PARQUET_FORMAT);
+    }
+
+    @Override
+    public void configure(IServiceContext ctx, Map<String, String> 
configuration, IWarningCollector warningCollector,
+            IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws 
AlgebricksException, HyracksDataException {
+        this.configurationCopy = new HashMap<>(configuration);
+        this.partitionConstraint = ((ICcApplicationContext) 
ctx.getApplicationContext()).getDataPartitioningProvider()
+                .getClusterLocations();
+
+        Catalog catalog = null;
+        Throwable throwable = null;
+        try {
+            String namespace = IcebergUtils.getNamespace(configuration);
+            String tableName = 
configuration.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+
+            catalog = 
IcebergUtils.initializeCatalog(IcebergUtils.filterCatalogProperties(configuration),
 namespace);
+            TableIdentifier tableIdentifier = 
TableIdentifier.of(Namespace.of(namespace), tableName);
+            if (!catalog.tableExists(tableIdentifier)) {
+                throw 
CompilationException.create(ErrorCode.ICEBERG_TABLE_DOES_NOT_EXIST, tableName);
+            }
+
+            Table table = catalog.loadTable(tableIdentifier);
+            TableScan scan = table.newScan();
+            scan = setSnapshot(configuration, scan);
+            String[] projectedFields = getProjectedFields(configuration);
+            projectedSchema = table.schema();
+            if (projectedFields != null && projectedFields.length > 0) {
+                projectedSchema = projectedSchema.select(projectedFields);
+            }
+            scan.project(projectedSchema);
+            try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+                tasks.forEach(fileScanTasks::add);
+            }
+            distributeWorkLoad(fileScanTasks, getPartitionsCount());
+        } catch (CompilationException ex) {
+            throwable = ex;
+            throw ex;
+        } catch (Exception ex) {
+            throwable = ex;
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, ex.getMessage());
+        } finally {
+            try {
+                IcebergUtils.closeCatalog(catalog);
+            } catch (Exception ex) {
+                if (throwable != null) {
+                    throwable.addSuppressed(ex);
+                } else {
+                    throw 
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex, 
ex.getMessage());
+                }
+            }
+        }
+    }
+
+    private void distributeWorkLoad(List<FileScanTask> fileScanTasks, int 
partitionsCount) {
+        PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new 
PriorityQueue<>(partitionsCount,
+                
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+        // Prepare the workloads based on the number of partitions
+        for (int i = 0; i < partitionsCount; i++) {
+            workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+        }
+
+        for (FileScanTask fileScanTask : fileScanTasks) {
+            PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+            workload.addFileScanTask(fileScanTask, fileScanTask.length());
+            workloadQueue.add(workload);
+        }
+        partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+    }
+
+    @Override
+    public Schema getProjectedSchema() {
+        return projectedSchema;
+    }
+
+    public static class PartitionWorkLoadBasedOnSize implements Serializable {
+        private static final long serialVersionUID = 3L;
+        private final List<FileScanTask> fileScanTasks = new ArrayList<>();
+        private long totalSize = 0;
+
+        public PartitionWorkLoadBasedOnSize() {
+        }
+
+        public List<FileScanTask> getFileScanTasks() {
+            return fileScanTasks;
+        }
+
+        public void addFileScanTask(FileScanTask task, long size) {
+            this.fileScanTasks.add(task);
+            this.totalSize += size;
+        }
+
+        public long getTotalSize() {
+            return totalSize;
+        }
+
+        @Override
+        public String toString() {
+            return "PartitionWorkLoadBasedOnSize{" + "fileScanTasks=" + 
fileScanTasks + ", totalSize=" + totalSize
+                    + '}';
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
new file mode 100644
index 0000000000..aee5616f19
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg.converter;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+public class IcebergConverterContext extends ParserContext {
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+    private final List<Warning> warnings;
+
+    public IcebergConverterContext(Map<String, String> configuration, 
List<Warning> warnings) {
+        this.warnings = warnings;
+        decimalToDouble = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.IcebergOptions.DECIMAL_TO_DOUBLE, 
ExternalDataConstants.FALSE));
+        timestampAsLong = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.IcebergOptions.TIMESTAMP_AS_LONG, 
ExternalDataConstants.TRUE));
+        dateAsInt = 
Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.IcebergOptions.DATE_AS_INT,
+                ExternalDataConstants.TRUE));
+        String configuredTimeZoneId = 
configuration.get(ExternalDataConstants.IcebergOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneOffset = 
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(int value, DataOutput output) {
+        try {
+            mutableDate.setValue(value);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+
+    public List<Warning> getWarnings() {
+        return warnings;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
new file mode 100644
index 0000000000..634b44d606
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import static 
org.apache.asterix.om.pointables.base.DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import 
org.apache.asterix.external.input.record.reader.aws.iceberg.converter.IcebergConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+
+public class IcebergParquetDataParser extends AbstractDataParser implements 
IRecordDataParser<Record> {
+    private final IcebergConverterContext parserContext;
+    private final IExternalFilterValueEmbedder valueEmbedder;
+    private final Schema projectedSchema;
+
+    public IcebergParquetDataParser(IExternalDataRuntimeContext context, 
Map<String, String> conf,
+            Schema projectedSchema) {
+        List<Warning> warnings = new ArrayList<>();
+        parserContext = new IcebergConverterContext(conf, warnings);
+        valueEmbedder = context.getValueEmbedder();
+        this.projectedSchema = projectedSchema;
+    }
+
+    @Override
+    public boolean parse(IRawRecord<? extends Record> record, DataOutput out) 
throws HyracksDataException {
+        try {
+            parseObject(record.get(), out);
+            valueEmbedder.reset();
+            return true;
+        } catch (AvroRuntimeException | IOException e) {
+            throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
e, getMessageOrToString(e));
+        }
+    }
+
+    private void parseObject(Record record, DataOutput out) throws IOException 
{
+        IMutableValueStorage valueBuffer = parserContext.enterObject();
+        IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        valueEmbedder.enterObject();
+        for (int i = 0; i < projectedSchema.columns().size(); i++) {
+            NestedField field = projectedSchema.columns().get(i);
+            String fieldName = field.name();
+            Type fieldType = field.type();
+            ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null, 
parserContext);
+            IValueReference value;
+            if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+                value = valueEmbedder.getEmbeddedValue();
+            } else {
+                valueBuffer.reset();
+                parseValue(fieldType, record, i, valueBuffer.getDataOutput());
+                value = valueBuffer;
+            }
+
+            if (value != null) {
+                // Ignore missing values
+                
objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+            }
+        }
+
+        embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+        objectBuilder.write(out, true);
+        valueEmbedder.exitObject();
+        parserContext.exitObject(valueBuffer, null, objectBuilder);
+    }
+
+    private void parseArray(Type arrayType, boolean isOptional, List<?> 
listValues, DataOutput out) throws IOException {
+        if (listValues == null) {
+            nullSerde.serialize(ANull.NULL, out);
+            return;
+        }
+        final IMutableValueStorage valueBuffer = 
parserContext.enterCollection();
+        final IAsterixListBuilder arrayBuilder = 
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
+        for (int i = 0; i < listValues.size(); i++) {
+            valueBuffer.reset();
+            //parseValue(elementSchema, elements, i, 
valueBuffer.getDataOutput());
+            arrayBuilder.addItem(valueBuffer);
+        }
+        arrayBuilder.write(out, true);
+        parserContext.exitCollection(valueBuffer, arrayBuilder);
+    }
+
+    public static ATypeTag getTypeTag(Type type, boolean isNull, 
IcebergConverterContext parserContext)
+            throws HyracksDataException {
+        if (isNull) {
+            return ATypeTag.NULL;
+        }
+
+        switch (type.typeId()) {
+            case BOOLEAN:
+                return ATypeTag.BOOLEAN;
+            case INTEGER:
+            case LONG:
+                return ATypeTag.BIGINT;
+            case FLOAT:
+                return ATypeTag.FLOAT;
+            case DOUBLE:
+                return ATypeTag.DOUBLE;
+            case STRING:
+                return ATypeTag.STRING;
+            case UUID:
+                return ATypeTag.UUID;
+            case BINARY:
+                return ATypeTag.BINARY;
+            case DECIMAL:
+                ensureDecimalToDoubleEnabled(type, parserContext);
+                return ATypeTag.DOUBLE;
+            case STRUCT:
+                return ATypeTag.OBJECT;
+            case LIST:
+                return ATypeTag.ARRAY;
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+            case TIMESTAMP_NANO:
+            case FIXED:
+            case GEOMETRY:
+            case GEOGRAPHY:
+            case MAP:
+            case VARIANT:
+            case UNKNOWN:
+                throw new NotImplementedException();
+            default:
+                throw createUnsupportedException(type);
+
+        }
+    }
+
+    private void parseValue(Type fieldType, Record record, int index, 
DataOutput out) throws IOException {
+        Object value = record.get(index);
+        if (value == null) {
+            nullSerde.serialize(ANull.NULL, out);
+            return;
+        }
+
+        switch (fieldType.typeId()) {
+            case BOOLEAN:
+                booleanSerde.serialize((Boolean) value ? ABoolean.TRUE : 
ABoolean.FALSE, out);
+                return;
+            case INTEGER:
+                serializeInteger(value, out);
+                return;
+            case LONG:
+                serializeLong(value, out);
+                return;
+            case FLOAT:
+                // TODO: should this be parsed as double?
+                serializeFloat(value, out);
+                return;
+            case DOUBLE:
+                serializeDouble(value, out);
+                return;
+            case STRING:
+                serializeString(value, out);
+                return;
+            case UUID:
+                serializeUuid(value, out);
+                return;
+            case BINARY:
+                serializeBinary(value, out);
+                return;
+            case DECIMAL:
+                ensureDecimalToDoubleEnabled(fieldType, parserContext);
+                serializeDecimal((BigDecimal) value, out);
+                return;
+            case STRUCT:
+                parseObject((Record) value, out);
+                return;
+            case LIST:
+                Types.ListType listType = fieldType.asListType();
+                parseArray(listType.elementType(), 
listType.isElementOptional(), (List<?>) value, out);
+                return;
+            case DATE:
+            case TIME:
+            case TIMESTAMP:
+            case TIMESTAMP_NANO:
+            case FIXED:
+            case GEOMETRY:
+            case GEOGRAPHY:
+            case MAP:
+            case VARIANT:
+            case UNKNOWN:
+                throw new NotImplementedException();
+
+        }
+    }
+
+    private void serializeInteger(Object value, DataOutput out) throws 
HyracksDataException {
+        int intValue = (Integer) value;
+        aInt64.setValue(intValue);
+        int64Serde.serialize(aInt64, out);
+    }
+
+    private void serializeLong(Object value, DataOutput out) throws 
HyracksDataException {
+        long longValue = (Long) value;
+        aInt64.setValue(longValue);
+        int64Serde.serialize(aInt64, out);
+    }
+
+    private void serializeFloat(Object value, DataOutput out) throws 
HyracksDataException {
+        float floatValue = (Float) value;
+        aFloat.setValue(floatValue);
+        floatSerde.serialize(aFloat, out);
+    }
+
+    private void serializeDouble(Object value, DataOutput out) throws 
HyracksDataException {
+        double doubleValue = (Double) value;
+        aDouble.setValue(doubleValue);
+        doubleSerde.serialize(aDouble, out);
+    }
+
+    private void serializeUuid(Object value, DataOutput out) throws 
HyracksDataException {
+        UUID uuid = (UUID) value;
+        String uuidValue = uuid.toString();
+        char[] buffer = uuidValue.toCharArray();
+        aUUID.parseUUIDString(buffer, 0, uuidValue.length());
+        uuidSerde.serialize(aUUID, out);
+    }
+
+    private void serializeString(Object value, DataOutput out) throws 
HyracksDataException {
+        aString.setValue(value.toString());
+        stringSerde.serialize(aString, out);
+    }
+
+    private void serializeDecimal(BigDecimal value, DataOutput out) throws 
HyracksDataException {
+        serializeDouble(value.doubleValue(), out);
+    }
+
+    private void serializeBinary(Object value, DataOutput out) throws 
HyracksDataException {
+        ByteBuffer byteBuffer = (ByteBuffer) value;
+        aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length);
+        binarySerde.serialize(aBinary, out);
+    }
+
+    private static HyracksDataException createUnsupportedException(Type type) {
+        return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg 
Parser", type.toString());
+    }
+
+    private static void ensureDecimalToDoubleEnabled(Type type, 
IcebergConverterContext context)
+            throws RuntimeDataException {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new 
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, 
type.toString(),
+                    ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
similarity index 63%
copy from 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
copy to 
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
index c5b5da3f36..18041fde6e 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
@@ -16,15 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.asterix.external.util.iceberg;
+package org.apache.asterix.external.parser.factory;
 
-public class IcebergConstants {
-    private IcebergConstants() {
-        throw new AssertionError("do not instantiate");
-    }
+import org.apache.iceberg.Schema;
+
+public abstract class IcebergParserFactory<T> extends 
AbstractGenericDataParserFactory<T> {
+    private static final long serialVersionUID = 1L;
 
-    public static final String ICEBERG_TABLE_FORMAT = "iceberg";
-    public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
-    public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
-    public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+    protected Schema projectedSchema;
+
+    public void setProjectedSchema(Schema projectedSchema) {
+        this.projectedSchema = projectedSchema;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
new file mode 100644
index 0000000000..9a2ec65d36
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.IcebergParquetDataParser;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.iceberg.data.Record;
+
+public class IcebergTableParquetDataParserFactory extends 
IcebergParserFactory<Record> {
+
+    private static final long serialVersionUID = 1L;
+    private static final List<String> PARSER_FORMAT = 
List.of(IcebergConstants.ICEBERG_PARQUET_FORMAT);
+
+    @Override
+    public IStreamDataParser 
createInputStreamParser(IExternalDataRuntimeContext context) {
+        throw new UnsupportedOperationException("Stream parser is not 
supported");
+    }
+
+    @Override
+    public void setMetaType(ARecordType metaType) {
+        // no MetaType to set.
+    }
+
+    @Override
+    public List<String> getParserFormats() {
+        return PARSER_FORMAT;
+    }
+
+    @Override
+    public IRecordDataParser<Record> 
createRecordParser(IExternalDataRuntimeContext context) {
+        return createParser(context);
+    }
+
+    @Override
+    public Class<?> getRecordClass() {
+        return Record.class;
+    }
+
+    private IcebergParquetDataParser createParser(IExternalDataRuntimeContext 
context) {
+        return new IcebergParquetDataParser(context, configuration, 
projectedSchema);
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 0360c15fb1..0f0f8d4661 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -44,6 +44,7 @@ import 
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactor
 import 
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -124,8 +125,10 @@ public class DatasourceFactoryProvider {
             Map<String, Class<?>> formatClassMap = factories.get(adaptorName);
             String format = 
configuration.get(ExternalDataConstants.KEY_FORMAT);
             if (isDeltaTable(configuration)) {
-                format = configuration.get(ExternalDataConstants.TABLE_FORMAT);
-                return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
+                return getDeltaInstance(configuration, formatClassMap);
+            }
+            if (IcebergUtils.isIcebergTable(configuration)) {
+                return getIcebergInstance(configuration, formatClassMap);
             }
             return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
         }
@@ -197,4 +200,16 @@ public class DatasourceFactoryProvider {
             throw new 
AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING,
 key);
         }
     }
+
+    private static IRecordReaderFactory<?> getDeltaInstance(Map<String, 
String> configuration,
+            Map<String, Class<?>> formatClassMap) throws AsterixException {
+        String format = configuration.get(ExternalDataConstants.TABLE_FORMAT);
+        return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
+    }
+
+    private static IRecordReaderFactory<?> getIcebergInstance(Map<String, 
String> configuration,
+            Map<String, Class<?>> formatClassMap) throws AsterixException {
+        String format = IcebergUtils.getIcebergFormat(configuration);
+        return getInstance(formatClassMap.getOrDefault(format, 
formatClassMap.get(DEFAULT_FORMAT)));
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 4c10339d99..afa799e782 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -32,6 +32,8 @@ import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.external.api.IDataParserFactory;
 import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.commons.io.IOUtils;
 import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 
@@ -51,7 +53,12 @@ public class ParserFactoryProvider {
             //        ExternalDataUtils.getDataverse(configuration), 
parserFactoryName);
             throw new NotImplementedException();
         } else {
-            String parserFactoryKey = 
ExternalDataUtils.getParserFactory(configuration);
+            String parserFactoryKey;
+            if (IcebergUtils.isIcebergTable(configuration)) {
+                parserFactoryKey = IcebergConstants.ICEBERG_PARQUET_FORMAT;
+            } else {
+                parserFactoryKey = 
ExternalDataUtils.getParserFactory(configuration);
+            }
             parserFactory = 
ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
         }
         return parserFactory;
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 2846cf1342..9b270746ac 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -255,6 +255,7 @@ public class ExternalDataConstants {
     public static final String DUMMY_DATABASE_NAME = "dbname";
     public static final String DUMMY_TYPE_NAME = "typeName";
     public static final String DUMMY_DATAVERSE_NAME = "a.b.c";
+    public static final String FORMAT_ICEBERG = "iceberg";
     public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
     public static final String FORMAT_DELTA = "delta";
     public static final Set<String> ALL_FORMATS;
@@ -425,6 +426,16 @@ public class ExternalDataConstants {
         public static final String TIMEZONE = "timezone";
     }
 
+    public static class IcebergOptions {
+        private IcebergOptions() {
+        }
+
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+        public static final String DATE_AS_INT = "date-to-int";
+        public static final String TIMEZONE = "timezone";
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 37946103e5..4c08667d8f 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -41,13 +41,16 @@ import static 
org.apache.asterix.external.util.azure.blob.BlobUtils.validateAzur
 import static 
org.apache.asterix.external.util.azure.datalake.DatalakeUtils.validateAzureDataLakeProperties;
 import static 
org.apache.asterix.external.util.google.GCSUtils.configureHdfsJobConf;
 import static 
org.apache.asterix.external.util.google.GCSUtils.validateProperties;
+import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.isIcebergTable;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
 import static 
org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 import static org.msgpack.core.MessagePack.Code.ARRAY16;
 
+import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -512,6 +515,9 @@ public class ExternalDataUtils {
                 configuration.put(ExternalDataConstants.KEY_PARSER, 
ExternalDataConstants.FORMAT_DELTA);
                 configuration.put(ExternalDataConstants.KEY_FORMAT, 
ExternalDataConstants.FORMAT_DELTA);
             }
+            if (isIcebergTable(configuration)) {
+                return;
+            }
             prepareTableFormat(configuration);
         }
     }
@@ -613,7 +619,6 @@ public class ExternalDataUtils {
 
         // If the table is in S3
         if 
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3))
 {
-
             conf.set(S3Constants.HADOOP_ACCESS_KEY_ID, 
configuration.get(AwsConstants.ACCESS_KEY_ID_FIELD_NAME));
             conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
                     
configuration.get(AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME));
@@ -1001,8 +1006,8 @@ public class ExternalDataUtils {
     }
 
     public static boolean supportsPushdown(Map<String, String> properties) {
-        //Currently, only Apache Parquet/Delta table format is supported
-        return isParquetFormat(properties) || isDeltaTable(properties);
+        //Currently, only Apache Parquet/Delta/Iceberg table format is 
supported
+        return isParquetFormat(properties) || isDeltaTable(properties) || 
isIcebergTable(properties);
     }
 
     /**
@@ -1252,4 +1257,30 @@ public class ExternalDataUtils {
         }
         return properties;
     }
+
+    public static ARecordType getExpectedType(String encodedRequestedFields) 
throws IOException {
+        if (ALL_FIELDS_TYPE.getTypeName().equals(encodedRequestedFields)) {
+            //By default, return the entire records
+            return ALL_FIELDS_TYPE;
+        } else if (EMPTY_TYPE.getTypeName().equals(encodedRequestedFields)) {
+            //No fields were requested
+            return EMPTY_TYPE;
+        }
+        //A subset of the fields was requested
+        Base64.Decoder decoder = Base64.getDecoder();
+        byte[] typeBytes = decoder.decode(encodedRequestedFields);
+        DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(typeBytes));
+        return 
ExternalDatasetProjectionFiltrationInfo.createTypeField(dataInputStream);
+    }
+
+    public static Map<String, FunctionCallInformation> 
getFunctionCallInformationMap(
+            String encodedFunctionCallInformation) throws IOException {
+        if (encodedFunctionCallInformation != null && 
!encodedFunctionCallInformation.isEmpty()) {
+            Base64.Decoder decoder = Base64.getDecoder();
+            byte[] functionCallInfoMapBytes = 
decoder.decode(encodedFunctionCallInformation);
+            DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(functionCallInfoMapBytes));
+            return 
ExternalDatasetProjectionFiltrationInfo.createFunctionCallInformationMap(dataInputStream);
+        }
+        return null;
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 7c7e031768..e57b4d3727 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,7 +21,6 @@ package org.apache.asterix.external.util;
 import static 
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
-import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.ByteArrayInputStream;
@@ -61,7 +60,6 @@ import 
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetReadS
 import org.apache.asterix.external.input.stream.HDFSInputStream;
 import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
 import org.apache.asterix.om.types.ARecordType;
-import 
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
 import org.apache.asterix.runtime.projection.FunctionCallInformation;
 import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -456,18 +454,7 @@ public class HDFSUtils {
 
     public static ARecordType getExpectedType(Configuration configuration) 
throws IOException {
         String encoded = 
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS, "");
-        if (ALL_FIELDS_TYPE.getTypeName().equals(encoded)) {
-            //By default, return the entire records
-            return ALL_FIELDS_TYPE;
-        } else if (EMPTY_TYPE.getTypeName().equals(encoded)) {
-            //No fields were requested
-            return EMPTY_TYPE;
-        }
-        //A subset of the fields was requested
-        Base64.Decoder decoder = Base64.getDecoder();
-        byte[] typeBytes = decoder.decode(encoded);
-        DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(typeBytes));
-        return 
ExternalDatasetProjectionFiltrationInfo.createTypeField(dataInputStream);
+        return ExternalDataUtils.getExpectedType(encoded);
     }
 
     public static void setFunctionCallInformationMap(Map<String, 
FunctionCallInformation> funcCallInfoMap,
@@ -479,13 +466,7 @@ public class HDFSUtils {
     public static Map<String, FunctionCallInformation> 
getFunctionCallInformationMap(Configuration conf)
             throws IOException {
         String encoded = 
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, 
"");
-        if (!encoded.isEmpty()) {
-            Base64.Decoder decoder = Base64.getDecoder();
-            byte[] functionCallInfoMapBytes = decoder.decode(encoded);
-            DataInputStream dataInputStream = new DataInputStream(new 
ByteArrayInputStream(functionCallInfoMapBytes));
-            return 
ExternalDatasetProjectionFiltrationInfo.createFunctionCallInformationMap(dataInputStream);
-        }
-        return null;
+        return ExternalDataUtils.getFunctionCallInformationMap(encoded);
     }
 
     public static void setWarnings(List<Warning> warnings, Configuration conf) 
throws IOException {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
index 0ba702fce8..f68136629c 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
@@ -23,12 +23,24 @@ import static 
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POIN
 import static 
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
 import static 
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
 
+import java.io.IOException;
 import java.util.Map;
+import java.util.UUID;
 
 import org.apache.asterix.common.api.IApplicationContext;
 import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.awsclient.EnsureCloseAWSClientFactory;
 import org.apache.asterix.external.util.aws.AwsUtils;
 import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
 import software.amazon.awssdk.regions.Region;
@@ -36,6 +48,8 @@ import software.amazon.awssdk.services.glue.GlueClient;
 import software.amazon.awssdk.services.glue.GlueClientBuilder;
 
 public class GlueUtils {
+    private static final Logger LOGGER = LogManager.getLogger();
+
     private GlueUtils() {
         throw new AssertionError("do not instantiate");
     }
@@ -64,4 +78,38 @@ public class GlueUtils {
         awsClients.setConsumingClient(builder.build());
         return awsClients;
     }
+
+    public static Catalog initializeCatalog(Map<String, String> 
catalogProperties, String namespace)
+            throws CompilationException {
+        // using uuid for a custom catalog name, this not the real catalog 
name and not used by services
+        // it is used by iceberg internally for logging and scoping in 
multi-catalog environment
+        GlueCatalog catalog = new GlueCatalog();
+        try {
+            catalogProperties.put(CatalogProperties.FILE_IO_IMPL, 
IcebergConstants.Aws.S3_FILE_IO);
+            catalogProperties.put(AwsProperties.CLIENT_FACTORY, 
EnsureCloseAWSClientFactory.class.getName());
+
+            String catalogName = UUID.randomUUID().toString();
+            catalog.initialize(catalogName, catalogProperties);
+            LOGGER.debug("Initialized AWS Glue catalog: {}", catalogName);
+
+            if (namespace != null && 
!catalog.namespaceExists(Namespace.of(namespace))) {
+                throw 
CompilationException.create(ErrorCode.ICEBERG_NAMESPACE_DOES_NOT_EXIST, 
namespace);
+            }
+        } catch (CompilationException ex) {
+            throw ex;
+        } catch (Throwable ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, ex.getMessage());
+        }
+        return catalog;
+    }
+
+    public static void closeCatalog(GlueCatalog catalog) throws 
CompilationException {
+        try {
+            if (catalog != null) {
+                catalog.close();
+            }
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, ex.getMessage());
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 8c81c1b9ff..9608108551 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -85,6 +85,7 @@ import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.external.util.ExternalDataUtils;
 import org.apache.asterix.external.util.aws.AwsUtils;
 import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -317,6 +318,13 @@ public class S3Utils {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
+        // iceberg tables can be created without passing the bucket,
+        // only validate bucket presence if container is passed
+        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+        if (IcebergUtils.isIcebergTable(configuration) && container == null) {
+            return;
+        }
+
         validateIncludeExclude(configuration);
         try {
             // TODO(htowaileb): maybe something better, this will check to 
ensure type is supported before creation
@@ -330,7 +338,6 @@ public class S3Utils {
         S3Client s3Client = (S3Client) awsClients.getConsumingClient();
         S3Response response;
         boolean useOldApi = false;
-        String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
         String prefix = getPrefix(configuration);
 
         try {
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
new file mode 100644
index 0000000000..9ebbe149a1
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.biglake_metastore;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.awsclient.EnsureCloseAWSClientFactory;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class BiglakeMetastore {
+    private static final Logger LOGGER = LogManager.getLogger();
+
+    private BiglakeMetastore() {
+        throw new AssertionError("do not instantiate");
+    }
+
+    public static Catalog initializeCatalog(Map<String, String> 
catalogProperties, String namespace)
+            throws CompilationException {
+        // using uuid for a custom catalog name, this not the real catalog 
name and not used by services
+        // it is used by iceberg internally for logging and scoping in 
multi-catalog environment
+        GlueCatalog catalog = new GlueCatalog();
+        try {
+            catalogProperties.put(CatalogProperties.FILE_IO_IMPL, 
IcebergConstants.Aws.S3_FILE_IO);
+            catalogProperties.put(AwsProperties.CLIENT_FACTORY, 
EnsureCloseAWSClientFactory.class.getName());
+
+            String catalogName = UUID.randomUUID().toString();
+            catalog.initialize(catalogName, catalogProperties);
+            LOGGER.debug("Initialized AWS Glue catalog: {}", catalogName);
+
+            if (namespace != null && 
!catalog.namespaceExists(Namespace.of(namespace))) {
+                throw 
CompilationException.create(ErrorCode.ICEBERG_NAMESPACE_DOES_NOT_EXIST, 
namespace);
+            }
+        } catch (CompilationException ex) {
+            throw ex;
+        } catch (Throwable ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, ex.getMessage());
+        }
+        return catalog;
+    }
+
+    public static void closeCatalog(GlueCatalog catalog) throws 
CompilationException {
+        try {
+            if (catalog != null) {
+                catalog.close();
+            }
+        } catch (IOException ex) {
+            throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, 
ex, ex.getMessage());
+        }
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
index c5b5da3f36..3921e36225 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
@@ -24,7 +24,20 @@ public class IcebergConstants {
     }
 
     public static final String ICEBERG_TABLE_FORMAT = "iceberg";
-    public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
+    public static final String ICEBERG_CATALOG_NAME = "catalogName";
     public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
     public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+    public static final String ICEBERG_TABLE_NAME_PROPERTY_KEY = "tableName";
+    public static final String ICEBERG_NAMESPACE_PROPERTY_KEY = "namespace";
+    public static final String ICEBERG_SNAPSHOT_ID_PROPERTY_KEY = "snapshotId";
+    public static final String ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY = 
"snapshotTimestamp";
+
+    public static final String ICEBERG_PROPERTY_PREFIX_INTERNAL = 
"catalog-property#";
+
+    public static final String ICEBERG_PARQUET_FORMAT = "iceberg-parquet";
+    public static final String ICEBERG_AVRO_FORMAT = "iceberg-avro";
+
+    public class Aws {
+        public static final String S3_FILE_IO = 
"org.apache.iceberg.aws.s3.S3FileIO";
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 9654ce3877..369b5fe2a6 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -18,14 +18,29 @@
  */
 package org.apache.asterix.external.util.iceberg;
 
+import static 
org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT;
+import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT;
+import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT;
 import static 
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT;
 
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Optional;
 
 import org.apache.asterix.common.config.CatalogConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.exceptions.CompilationException;
 import org.apache.asterix.common.exceptions.ErrorCode;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.glue.GlueUtils;
+import 
org.apache.asterix.external.util.google.biglake_metastore.BiglakeMetastore;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
 
 public class IcebergUtils {
 
@@ -87,4 +102,103 @@ public class IcebergUtils {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, key);
         }
     }
+
+    public static void validateIcebergTableProperties(Map<String, String> 
properties) throws CompilationException {
+        String tableName = 
properties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+        if (tableName == null || tableName.isEmpty()) {
+            throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+                    IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+        }
+    }
+
+    /**
+     * Extracts and returns the iceberg catalog properties from the provided 
configuration
+     *
+     * @param configuration configuration
+     * @return catalog properties
+     */
+    public static Map<String, String> filterCatalogProperties(Map<String, 
String> configuration) {
+        Map<String, String> catalogProperties = new HashMap<>();
+        Iterator<Map.Entry<String, String>> iterator = 
configuration.entrySet().iterator();
+        while (iterator.hasNext()) {
+            Map.Entry<String, String> entry = iterator.next();
+            if 
(entry.getKey().startsWith(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL)) {
+                catalogProperties.put(
+                        
entry.getKey().substring(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL.length()),
+                        entry.getValue());
+                iterator.remove();
+            }
+        }
+        return catalogProperties;
+    }
+
+    /**
+     * Namespace can be null (not passed), or it can be passed for the catalog 
or the collection. If it is passed
+     * for both, namespace for the collection will be used, otherwise, the 
namespace for the catalog will be used.
+     *
+     * @param configuration configuration
+     * @return namespace
+     */
+    public static String getNamespace(Map<String, String> configuration) {
+        String namespace = 
configuration.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+        if (namespace != null) {
+            return namespace;
+        }
+
+        String catalogNamespaceProperty =
+                IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL + 
IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY;
+        namespace = configuration.get(catalogNamespaceProperty);
+        return namespace;
+    }
+
+    public static String getIcebergFormat(Map<String, String> configuration) 
throws AsterixException {
+        String format = 
configuration.get(ExternalDataConstants.KEY_FORMAT).toLowerCase();
+        return switch (format) {
+            case ExternalDataConstants.FORMAT_PARQUET -> 
ICEBERG_PARQUET_FORMAT;
+            case ExternalDataConstants.FORMAT_AVRO -> ICEBERG_AVRO_FORMAT;
+            default -> throw 
AsterixException.create(UNSUPPORTED_ICEBERG_DATA_FORMAT, format);
+        };
+    }
+
+    public static Catalog initializeCatalog(Map<String, String> 
catalogProperties, String namespace)
+            throws CompilationException {
+        String source = 
catalogProperties.get(IcebergConstants.ICEBERG_SOURCE_PROPERTY_KEY);
+        Optional<CatalogConfig.IcebergCatalogSource> catalogSource = 
CatalogConfig.getIcebergCatalogSource(source);
+        if (catalogSource.isEmpty()) {
+            throw 
CompilationException.create(ErrorCode.UNSUPPORTED_ICEBERG_CATALOG_SOURCE, 
source);
+        }
+
+        return switch (catalogSource.get()) {
+            case CatalogConfig.IcebergCatalogSource.AWS_GLUE -> 
GlueUtils.initializeCatalog(catalogProperties, namespace);
+            case CatalogConfig.IcebergCatalogSource.BIGLAKE_METASTORE -> 
BiglakeMetastore.initializeCatalog(catalogProperties, namespace);
+            case CatalogConfig.IcebergCatalogSource.REST -> null;
+        };
+    }
+
+    public static void closeCatalog(Catalog catalog) throws 
CompilationException {
+        if (catalog != null) {
+            if (catalog instanceof GlueCatalog) {
+                GlueUtils.closeCatalog((GlueCatalog) catalog);
+            }
+        }
+    }
+
+    public static TableScan setSnapshot(Map<String, String> configuration, 
TableScan scan) {
+        String snapshot = 
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+        if (snapshot != null) {
+            scan = scan.useSnapshot(Long.parseLong(snapshot));
+        } else {
+            String asOfTimestamp = 
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+            if (asOfTimestamp != null) {
+                scan = scan.asOfTime(Long.parseLong(asOfTimestamp));
+            }
+        }
+        return scan;
+    }
+
+    public static String[] getProjectedFields(Map<String, String> 
configuration) throws IOException {
+        String encoded = 
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
+        ARecordType projectedRecordType = 
ExternalDataUtils.getExpectedType(encoded);
+        return projectedRecordType.getFieldNames();
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index 793516ce03..e9ea9a6682 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -23,4 +23,5 @@ org.apache.asterix.external.parser.factory.RSSParserFactory
 org.apache.asterix.external.parser.factory.TweetParserFactory
 org.apache.asterix.external.parser.factory.NoOpDataParserFactory
 org.apache.asterix.external.parser.factory.AvroDataParserFactory
-org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
\ No newline at end of file
+org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
+org.apache.asterix.external.parser.factory.IcebergTableParquetDataParserFactory
\ No newline at end of file
diff --git 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 0eef480f2f..e699937321 100644
--- 
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++ 
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,8 +21,9 @@ org.apache.asterix.external.input.HDFSDataSourceFactory
 
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
 
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
 org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
-org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
 
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory
+org.apache.asterix.external.input.record.reader.aws.iceberg.IcebergParquetRecordReaderFactory
+org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
 org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
 
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 01ce9a98d4..f0a803ae11 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -68,6 +68,8 @@ import 
org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
 import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
 import org.apache.asterix.external.provider.AdapterFactoryProvider;
 import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
 import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
@@ -87,6 +89,8 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity;
 import org.apache.asterix.metadata.entities.FullTextConfigMetadataEntity;
 import org.apache.asterix.metadata.entities.FullTextFilterMetadataEntity;
 import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.entities.IcebergCatalog;
+import org.apache.asterix.metadata.entities.IcebergCatalogDetails;
 import org.apache.asterix.metadata.entities.Index;
 import org.apache.asterix.metadata.entities.Synonym;
 import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
@@ -1003,13 +1007,31 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
                     dataset.getDataverseName().getCanonicalForm());
             setExternalEntityId(configuration);
             setSourceType(configuration, adapterName);
+
+            // for iceberg table, add catalog properties to the configuration
+            addIcebergCatalogPropertiesIfNeeded(configuration);
             return 
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
 adapterName,
                     configuration, itemType, null, warningCollector, 
filterEvaluatorFactory);
+        } catch (AlgebricksException e) {
+            throw e;
         } catch (Exception e) {
             throw new AlgebricksException("Unable to create adapter", e);
         }
     }
 
+    private void addIcebergCatalogPropertiesIfNeeded(Map<String, String> 
configuration) throws AlgebricksException {
+        if (IcebergUtils.isIcebergTable(configuration)) {
+            String catalogName = 
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+            IcebergCatalog catalog =
+                    (IcebergCatalog) 
MetadataManager.INSTANCE.getCatalog(getMetadataTxnContext(), catalogName);
+            IcebergCatalogDetails details = (IcebergCatalogDetails) 
catalog.getCatalogDetails();
+            for (Map.Entry<String, String> entry : 
details.getProperties().entrySet()) {
+                
configuration.putIfAbsent(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL + 
entry.getKey(),
+                        entry.getValue());
+            }
+        }
+    }
+
     protected void setSourceType(Map<String, String> configuration, String 
adapterName) {
         
configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE, 
adapterName);
     }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index e637f28308..de309268b8 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -53,6 +53,7 @@ import org.apache.asterix.common.metadata.MetadataUtil;
 import org.apache.asterix.common.transactions.IRecoveryManager;
 import org.apache.asterix.common.utils.JobUtils;
 import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
 import org.apache.asterix.formats.base.IDataFormat;
 import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
 import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -879,6 +880,11 @@ public class DatasetUtil {
                 .isDeltaTable(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
     }
 
+    public static boolean isIcebergTable(Dataset dataset) {
+        return dataset.getDatasetType() == DatasetType.EXTERNAL
+                && IcebergUtils.isIcebergTable(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
+    }
+
     public static boolean isParquetFormat(Dataset dataset) {
         return dataset.getDatasetType() == DatasetType.EXTERNAL && 
ExternalDataUtils
                 .isParquetFormat(((ExternalDatasetDetails) 
dataset.getDatasetDetails()).getProperties());
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 79db1a0ea4..37cc9d8e5c 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -20,6 +20,7 @@ package org.apache.asterix.metadata.utils;
 
 import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
 import static 
org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
+import static 
org.apache.asterix.external.util.iceberg.IcebergUtils.isIcebergTable;
 import static 
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
 import static 
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
 
@@ -44,6 +45,7 @@ import org.apache.asterix.common.transactions.TxnId;
 import org.apache.asterix.external.indexing.ExternalFile;
 import 
org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
 import 
org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import 
org.apache.asterix.external.input.filter.NoOpIcebergTableFilterEvaluatorFactory;
 import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
 import org.apache.asterix.external.util.ExternalDataPrefix;
 import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
@@ -353,43 +355,73 @@ public class IndexUtil {
             IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo 
projectionFiltrationInfo,
             Map<String, String> properties) throws AlgebricksException {
         if (isDeltaTable(properties)) {
-            if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
-                return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
-            } else {
-                DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
-                        (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
-                return builder.build();
-            }
-        } else if (isParquetFormat(properties)) {
-            if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
-                return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
-            } else {
-                ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
-                ExternalDatasetProjectionFiltrationInfo pfi =
-                        (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
-                IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory 
=
-                        NoOpExternalFilterEvaluatorFactory.INSTANCE;
-                if (!prefix.getPaths().isEmpty()) {
-                    ExternalFilterBuilder externalFilterBuilder =
-                            new ExternalFilterBuilder(pfi, context, typeEnv, 
prefix);
-                    externalFilterEvaluatorFactory = 
externalFilterBuilder.build();
-                }
-                ParquetFilterBuilder builder = new ParquetFilterBuilder(
-                        (ParquetExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
-                return new 
ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
-                        builder.buildFilterPredicate());
-            }
+            return createDeltaFilter(projectionFiltrationInfo, context, 
typeEnv);
+        }
+        if (isIcebergTable(properties)) {
+            return createIcebergFilter(projectionFiltrationInfo, context, 
typeEnv);
+        }
+        if (isParquetFormat(properties)) {
+            return createParquetFilter(projectionFiltrationInfo, context, 
typeEnv, properties);
+        }
+        return createExternalFilter(projectionFiltrationInfo, context, 
typeEnv, properties);
+    }
+
+    private static IExternalFilterEvaluatorFactory 
createDeltaFilter(IProjectionFiltrationInfo projectionFiltrationInfo,
+            JobGenContext context, IVariableTypeEnvironment typeEnv) throws 
AlgebricksException {
+        if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
+            return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
         } else {
-            if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
-                return NoOpExternalFilterEvaluatorFactory.INSTANCE;
-            } else {
-                ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
-                ExternalDatasetProjectionFiltrationInfo pfi =
-                        (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
-                ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, 
context, typeEnv, prefix);
-                return build.build();
+            DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
+            return builder.build();
+        }
+    }
+
+    private static IExternalFilterEvaluatorFactory createIcebergFilter(
+            IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext 
context, IVariableTypeEnvironment typeEnv)
+            throws AlgebricksException {
+        if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
+            return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE;
+        } else {
+            //            IcebergTableFilterBuilder builder = new 
IcebergTableFilterBuilder(
+            //                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
+            //            return builder.build();
+            return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE;
+        }
+    }
+
+    private static IExternalFilterEvaluatorFactory createParquetFilter(
+            IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext 
context, IVariableTypeEnvironment typeEnv,
+            Map<String, String> properties) throws AlgebricksException {
+        if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
+            return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+        } else {
+            ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+            ExternalDatasetProjectionFiltrationInfo pfi =
+                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
+            IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+                    NoOpExternalFilterEvaluatorFactory.INSTANCE;
+            if (!prefix.getPaths().isEmpty()) {
+                ExternalFilterBuilder externalFilterBuilder = new 
ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+                externalFilterEvaluatorFactory = externalFilterBuilder.build();
             }
+            ParquetFilterBuilder builder = new ParquetFilterBuilder(
+                    (ParquetExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo, context, typeEnv);
+            return new 
ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory, 
builder.buildFilterPredicate());
         }
     }
 
+    private static IExternalFilterEvaluatorFactory createExternalFilter(
+            IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext 
context, IVariableTypeEnvironment typeEnv,
+            Map<String, String> properties) throws AlgebricksException {
+        if (projectionFiltrationInfo == 
DefaultProjectionFiltrationInfo.INSTANCE) {
+            return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+        } else {
+            ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+            ExternalDatasetProjectionFiltrationInfo pfi =
+                    (ExternalDatasetProjectionFiltrationInfo) 
projectionFiltrationInfo;
+            ExternalFilterBuilder build = new ExternalFilterBuilder(pfi, 
context, typeEnv, prefix);
+            return build.build();
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 966bfa8cf8..07f62c9a76 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -70,10 +70,10 @@ public class MetadataLockUtil implements IMetadataLockUtil {
             String metaItemTypeDatabase, DataverseName 
metaItemTypeDataverseName, String metaItemTypeName,
             boolean metaItemTypeAnonymous, String nodeGroupName, String 
compactionPolicyName,
             boolean isDefaultCompactionPolicy, DatasetConfig.DatasetType 
datasetType, Object datasetDetails,
-            IMetadataProvider metadataProvider) throws AlgebricksException {
+            IMetadataProvider metadataProvider, String catalogName) throws 
AlgebricksException {
         createDatasetBeginPre(lockMgr, locks, database, dataverseName, 
itemTypeDatabase, itemTypeDataverseName,
                 itemTypeName, itemTypeAnonymous, metaItemTypeDatabase, 
metaItemTypeDataverseName, metaItemTypeName,
-                metaItemTypeAnonymous, nodeGroupName, compactionPolicyName, 
isDefaultCompactionPolicy);
+                metaItemTypeAnonymous, nodeGroupName, compactionPolicyName, 
isDefaultCompactionPolicy, catalogName);
         lockMgr.acquireDatasetWriteLock(locks, database, dataverseName, 
datasetName);
     }
 
@@ -81,7 +81,7 @@ public class MetadataLockUtil implements IMetadataLockUtil {
             DataverseName dataverseName, String itemTypeDatabase, 
DataverseName itemTypeDataverseName,
             String itemTypeName, boolean itemTypeAnonymous, String 
metaItemTypeDatabase,
             DataverseName metaItemTypeDataverseName, String metaItemTypeName, 
boolean metaItemTypeAnonymous,
-            String nodeGroupName, String compactionPolicyName, boolean 
isDefaultCompactionPolicy)
+            String nodeGroupName, String compactionPolicyName, boolean 
isDefaultCompactionPolicy, String catalogName)
             throws AlgebricksException {
         lockMgr.acquireDatabaseReadLock(locks, database);
         lockMgr.acquireDataverseReadLock(locks, database, dataverseName);
@@ -114,6 +114,9 @@ public class MetadataLockUtil implements IMetadataLockUtil {
         if (!isDefaultCompactionPolicy) {
             lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
         }
+        if (catalogName != null) {
+            lockMgr.acquireCatalogReadLock(locks, catalogName);
+        }
     }
 
     @Override
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 88b3591806..8d6524fb67 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -102,7 +102,7 @@
     <awsjavasdk.version>2.29.27</awsjavasdk.version>
     <awsjavasdk.crt.version>0.33.3</awsjavasdk.crt.version>
 
-    <parquet.version>1.15.2</parquet.version>
+    <parquet.version>1.16.0</parquet.version>
     <hadoop-awsjavasdk.version>1.12.788</hadoop-awsjavasdk.version>
 
     <azureblobjavasdk.version>12.31.1</azureblobjavasdk.version>
@@ -118,6 +118,8 @@
     <io.opencensus.version>0.31.1</io.opencensus.version>
     <protobuf-java.version>3.25.8</protobuf-java.version>
 
+    <icebergjavasdk.version>1.10.0</icebergjavasdk.version>
+
     <implementation.title>Apache AsterixDB - 
${project.name}</implementation.title>
     <implementation.url>https://asterixdb.apache.org/</implementation.url>
     <implementation.version>${project.version}</implementation.version>
@@ -1925,33 +1927,6 @@
         <artifactId>avro</artifactId>
         <version>1.12.0</version>
       </dependency>
-      <dependency>
-        <groupId>org.apache.iceberg</groupId>
-        <artifactId>iceberg-core</artifactId>
-        <version>1.5.2</version>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.iceberg</groupId>
-        <artifactId>iceberg-data</artifactId>
-        <version>1.5.2</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
-      <dependency>
-        <groupId>org.apache.iceberg</groupId>
-        <artifactId>iceberg-parquet</artifactId>
-        <version>1.5.2</version>
-        <exclusions>
-          <exclusion>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-api</artifactId>
-          </exclusion>
-        </exclusions>
-      </dependency>
       <dependency>
         <groupId>org.apache.parquet</groupId>
         <artifactId>parquet-avro</artifactId>
@@ -2001,6 +1976,33 @@
         <artifactId>aws-crt</artifactId>
         <version>${awsjavasdk.crt.version}</version>
       </dependency>
+
+      <!-- Iceberg -->
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-core</artifactId>
+        <version>${icebergjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-api</artifactId>
+        <version>${icebergjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-data</artifactId>
+        <version>${icebergjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-parquet</artifactId>
+        <version>${icebergjavasdk.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.iceberg</groupId>
+        <artifactId>iceberg-aws</artifactId>
+        <version>${icebergjavasdk.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 

Reply via email to