This is an automated email from the ASF dual-hosted git repository.
htowaileb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 6519550a89 [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.2
6519550a89 is described below
commit 6519550a89fc7804a3e0f96982981288b18458a8
Author: Hussain Towaileb <[email protected]>
AuthorDate: Sun Sep 21 16:46:35 2025 +0300
[ASTERIXDB-3634][EXT]: Add support to Iceberg pt.2
Details:
- Supported AWS Glue catalog.
- Supported read path AWS Glue -> Parquet pipeline
- Tested types:
- boolean
- integer
- long
- float
- double
- string
- uuid
- decimal (to double)
- Supported projection
- Skeleton for supporting partition pruning and filtration
in place
- Custom AWS client providers to support all auth methods
Ext-ref: MB-63115
Change-Id: I531c613fe061367b2cb5c187bc985b39d6de17d1
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20478
Reviewed-by: Hussain Towaileb <[email protected]>
Tested-by: Hussain Towaileb <[email protected]>
---
.../rules/PushValueAccessAndFilterDownRule.java | 2 +
.../IcebergTableFilterPushdownProcessor.java | 64 +++++
.../asterix/app/translator/QueryTranslator.java | 32 ++-
.../handlers/CatalogStatementHandler.java | 4 +-
.../api/common/LocalCloudUtilAdobeMock.java | 9 -
.../catalog/all-supported-types/test.000.ddl.sqlpp | 4 +-
.../asterix/common/exceptions/ErrorCode.java | 3 +
.../asterix/common/metadata/IMetadataLockUtil.java | 2 +-
.../src/main/resources/asx_errormsg/en.properties | 3 +
asterixdb/asterix-external-data/pom.xml | 21 ++
.../adapter/factory/GenericAdapterFactory.java | 13 +
.../IIcebergRecordReaderFactory.java} | 14 +-
.../awsclient/EnsureCloseAWSClientFactory.java | 132 ++++++++++
.../filter/IcebergTableFilterEvaluatorFactory.java | 52 ++++
.../NoOpIcebergTableFilterEvaluatorFactory.java | 46 ++++
.../aws/iceberg/IcebergFileRecordReader.java | 177 +++++++++++++
.../iceberg/IcebergParquetRecordReaderFactory.java | 212 +++++++++++++++
.../iceberg/converter/IcebergConverterContext.java | 107 ++++++++
.../external/parser/IcebergParquetDataParser.java | 292 +++++++++++++++++++++
.../factory/IcebergParserFactory.java} | 19 +-
.../IcebergTableParquetDataParserFactory.java | 64 +++++
.../provider/DatasourceFactoryProvider.java | 19 +-
.../external/provider/ParserFactoryProvider.java | 9 +-
.../external/util/ExternalDataConstants.java | 11 +
.../asterix/external/util/ExternalDataUtils.java | 37 ++-
.../apache/asterix/external/util/HDFSUtils.java | 23 +-
.../asterix/external/util/aws/glue/GlueUtils.java | 48 ++++
.../asterix/external/util/aws/s3/S3Utils.java | 9 +-
.../google/biglake_metastore/BiglakeMetastore.java | 77 ++++++
.../external/util/iceberg/IcebergConstants.java | 15 +-
.../external/util/iceberg/IcebergUtils.java | 114 ++++++++
....apache.asterix.external.api.IDataParserFactory | 3 +-
...pache.asterix.external.api.IRecordReaderFactory | 3 +-
.../metadata/declared/MetadataProvider.java | 22 ++
.../apache/asterix/metadata/utils/DatasetUtil.java | 6 +
.../apache/asterix/metadata/utils/IndexUtil.java | 100 ++++---
.../asterix/metadata/utils/MetadataLockUtil.java | 9 +-
asterixdb/pom.xml | 58 ++--
38 files changed, 1705 insertions(+), 130 deletions(-)
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
index bd9e3293a5..ffc6febd23 100644
---
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/PushValueAccessAndFilterDownRule.java
@@ -34,6 +34,7 @@ import
org.apache.asterix.optimizer.rules.pushdown.processor.ColumnValueAccessPu
import
org.apache.asterix.optimizer.rules.pushdown.processor.ConsolidateProjectionAndFilterExpressionsProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.DeltaTableFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ExternalDatasetFilterPushdownProcessor;
+import
org.apache.asterix.optimizer.rules.pushdown.processor.IcebergTableFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.InlineAndNormalizeFilterExpressionsProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.processor.ParquetFilterPushdownProcessor;
import
org.apache.asterix.optimizer.rules.pushdown.visitor.PushdownOperatorVisitor;
@@ -121,6 +122,7 @@ public class PushValueAccessAndFilterDownRule implements
IAlgebraicRewriteRule {
// Performs prefix pushdowns
pushdownProcessorsExecutor.add(new
ExternalDatasetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor.add(new
DeltaTableFilterPushdownProcessor(pushdownContext, context));
+ pushdownProcessorsExecutor.add(new
IcebergTableFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor.add(new
ParquetFilterPushdownProcessor(pushdownContext, context));
pushdownProcessorsExecutor
.add(new
ConsolidateProjectionAndFilterExpressionsProcessor(pushdownContext, context));
diff --git
a/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
new file mode 100644
index 0000000000..03477c5ab0
--- /dev/null
+++
b/asterixdb/asterix-algebra/src/main/java/org/apache/asterix/optimizer/rules/pushdown/processor/IcebergTableFilterPushdownProcessor.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.optimizer.rules.pushdown.processor;
+
+import static org.apache.asterix.metadata.utils.PushdownUtil.ARRAY_FUNCTIONS;
+
+import org.apache.asterix.metadata.utils.DatasetUtil;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.asterix.optimizer.rules.pushdown.PushdownContext;
+import
org.apache.asterix.optimizer.rules.pushdown.descriptor.ScanDefineDescriptor;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.AnyExpectedSchemaNode;
+import
org.apache.asterix.optimizer.rules.pushdown.schema.ExpectedSchemaNodeType;
+import org.apache.asterix.optimizer.rules.pushdown.schema.IExpectedSchemaNode;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.algebricks.core.algebra.base.IOptimizationContext;
+import
org.apache.hyracks.algebricks.core.algebra.expressions.AbstractFunctionCallExpression;
+import org.apache.hyracks.algebricks.core.algebra.functions.FunctionIdentifier;
+
+public class IcebergTableFilterPushdownProcessor extends
ColumnFilterPushdownProcessor {
+
+ public IcebergTableFilterPushdownProcessor(PushdownContext
pushdownContext, IOptimizationContext context) {
+ super(pushdownContext, context);
+ }
+
+ @Override
+ protected boolean skip(ScanDefineDescriptor scanDefineDescriptor) throws
AlgebricksException {
+ return !DatasetUtil.isIcebergTable(scanDefineDescriptor.getDataset());
+ }
+
+ @Override
+ protected boolean isNotPushable(AbstractFunctionCallExpression expression)
{
+ FunctionIdentifier fid = expression.getFunctionIdentifier();
+ return ARRAY_FUNCTIONS.contains(fid) ||
super.isNotPushable(expression);
+ }
+
+ @Override
+ protected FilterBranch handlePath(AbstractFunctionCallExpression
expression, IExpectedSchemaNode node)
+ throws AlgebricksException {
+ if (node.getType() != ExpectedSchemaNodeType.ANY) {
+ return FilterBranch.NA;
+ }
+
+ // The inferred path from the provided expression
+ ARecordType expressionPath =
pathBuilderVisitor.buildPath((AnyExpectedSchemaNode) node);
+ paths.put(expression, expressionPath);
+ return FilterBranch.FILTER_PATH;
+ }
+}
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index bf91785ec4..0946e4a4a3 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -120,6 +120,8 @@ import
org.apache.asterix.external.operators.FeedIntakeOperatorNodePushable;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.WriterValidationUtil;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import
org.apache.asterix.external.writer.printer.parquet.SchemaConverterVisitor;
import org.apache.asterix.lang.common.base.Expression;
import org.apache.asterix.lang.common.base.IQueryRewriter;
@@ -190,6 +192,7 @@ import org.apache.asterix.lang.common.struct.Identifier;
import org.apache.asterix.lang.common.struct.VarIdentifier;
import org.apache.asterix.lang.common.util.FunctionUtil;
import org.apache.asterix.lang.common.util.LangDatasetUtil;
+import org.apache.asterix.lang.common.util.LangRecordParseUtil;
import org.apache.asterix.lang.common.util.ViewUtil;
import org.apache.asterix.lang.sqlpp.rewrites.SqlppQueryRewriter;
import org.apache.asterix.metadata.IDatasetDetails;
@@ -200,6 +203,7 @@ import
org.apache.asterix.metadata.dataset.DatasetFormatInfo;
import org.apache.asterix.metadata.dataset.hints.DatasetHints;
import
org.apache.asterix.metadata.dataset.hints.DatasetHints.DatasetNodegroupCardinalityHint;
import org.apache.asterix.metadata.declared.MetadataProvider;
+import org.apache.asterix.metadata.entities.Catalog;
import org.apache.asterix.metadata.entities.CompactionPolicy;
import org.apache.asterix.metadata.entities.Database;
import org.apache.asterix.metadata.entities.Dataset;
@@ -877,10 +881,19 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
return;
}
+ String catalogName = null;
+ if (dd.getDatasetType().equals(DatasetType.EXTERNAL)) {
+ ExternalDetailsDecl edd = (ExternalDetailsDecl)
dd.getDatasetDetailsDecl();
+ Map<String, String> properties = new
HashMap<>(edd.getProperties());
+ LangRecordParseUtil.recordToMap(properties, ((DatasetDecl)
stmt).getWithObjectNode());
+ catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+ }
+
lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(),
databaseName, dataverseName, datasetName,
itemTypeDatabase, itemTypeDataverseName, itemTypeName,
itemTypeAnonymous, metaItemTypeDatabase,
metaItemTypeDataverseName, metaItemTypeName,
metaItemTypeAnonymous, nodegroupName, compactionPolicy,
- defaultCompactionPolicy, dd.getDatasetType(),
dd.getDatasetDetailsDecl(), metadataProvider);
+ defaultCompactionPolicy, dd.getDatasetType(),
dd.getDatasetDetailsDecl(), metadataProvider,
+ catalogName);
try {
doCreateDatasetStatement(metadataProvider, dd,
stmtActiveNamespace, datasetName, itemTypeNamespace,
itemTypeExpr, itemTypeName, metaItemTypeExpr,
metaItemTypeNamespace, metaItemTypeName, hcc,
@@ -1061,6 +1074,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
ExternalDataUtils.normalize(properties);
ExternalDataUtils.validate(properties);
ExternalDataUtils.validateType(properties, (ARecordType)
itemType);
+ validateIfIcebergTable(properties, mdTxnCtx, sourceLoc);
validateExternalDatasetProperties(externalDetails,
properties, dd.getSourceLocation(), mdTxnCtx,
appCtx, metadataProvider);
datasetDetails = new
ExternalDatasetDetails(externalDetails.getAdapter(), properties, new Date(),
@@ -1173,6 +1187,20 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
return Optional.of(dataset);
}
+ private void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
+ SourceLocation srcLoc) throws AlgebricksException {
+ if (!IcebergUtils.isIcebergTable(properties)) {
+ return;
+ }
+
+ // ensure the specified catalog exists
+ String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+ Catalog catalog = MetadataManager.INSTANCE.getCatalog(mdTxnCtx,
catalogName);
+ if (catalog == null) {
+ throw new CompilationException(ErrorCode.UNKNOWN_CATALOG, srcLoc,
catalogName);
+ }
+ }
+
protected boolean isDatasetWithoutTypeSpec(DatasetDecl datasetDecl,
ARecordType aRecordType,
ARecordType metaRecType) {
return aRecordType.getFieldNames().length == 0 && metaRecType == null;
@@ -2927,7 +2955,7 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
}
lockUtil.createDatasetBegin(lockManager, metadataProvider.getLocks(),
databaseName, dataverseName, viewName,
itemTypeDatabaseName, viewItemTypeDataverseName,
viewItemTypeName, viewItemTypeAnonymous, null, null,
- null, false, null, null, true, DatasetType.VIEW, null,
metadataProvider);
+ null, false, null, null, true, DatasetType.VIEW, null,
metadataProvider, null);
try {
doCreateView(metadataProvider, cvs, databaseName, dataverseName,
viewName, itemTypeDatabaseName,
viewItemTypeDataverseName, viewItemTypeName, stmtRewriter,
requestParameters, creator);
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
index bba267c53d..8c33407e44 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/handlers/CatalogStatementHandler.java
@@ -82,7 +82,7 @@ public class CatalogStatementHandler {
handleDrop();
return;
default:
- throw new IllegalStateException("Catalog handler handling
non-catalog statements");
+ throw new IllegalStateException("Catalog statement handler
handling non-catalog statement: " + kind);
}
}
@@ -250,7 +250,7 @@ public class CatalogStatementHandler {
protected void validateIcebergCatalogProperties(CatalogCreateStatement
statement,
MetadataTransactionContext mdTxnCtx, MetadataProvider
metadataProvider) throws AlgebricksException {
IcebergCatalogCreateStatement icebergStatement =
(IcebergCatalogCreateStatement) statement;
- IcebergCatalogDetailsDecl details = (IcebergCatalogDetailsDecl)
icebergStatement.getCatalogDetailsDecl();
+ IcebergCatalogDetailsDecl details =
icebergStatement.getCatalogDetailsDecl();
Map<String, String> allProperties = new
HashMap<>(details.getProperties());
LangRecordParseUtil.recordToMap(allProperties,
icebergStatement.getWithObjectNode());
IcebergUtils.validateCatalogProperties(allProperties);
diff --git
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
index 8509afff0f..be83b09e43 100644
---
a/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
+++
b/asterixdb/asterix-app/src/test/java/org/apache/asterix/api/common/LocalCloudUtilAdobeMock.java
@@ -140,13 +140,4 @@ public class LocalCloudUtilAdobeMock {
s3Mock = null;
}
}
-
- public static S3Client getS3Client() {
- S3ClientBuilder builder = S3Client.builder();
- URI endpoint = URI.create(s3Mock.getHttpEndpoint());
-
builder.region(Region.of(MOCK_SERVER_REGION)).credentialsProvider(AnonymousCredentialsProvider.create())
- .endpointOverride(endpoint);
- S3Client client = builder.build();
- return client;
- }
}
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
index c20f27d9ef..657e2676ae 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/queries_sqlpp/iceberg/catalog/all-supported-types/test.000.ddl.sqlpp
@@ -32,7 +32,7 @@ CREATE CATALOG myRestCatalog
TYPE Iceberg
SOURCE REST
WITH {
- "warehouse": "s3://cbas-iceberg-playground/my-glue-catalog/warehouse/",
+ "warehouse": "s3://cbas-iceberg-playground/my-rest-catalog/warehouse/",
"namespace": "pharmacy_namespace",
"accessKeyId" : "myAccessKeyId",
"secretAccessKey" : "mySecretAccessKey",
@@ -43,7 +43,7 @@ CREATE CATALOG myBiglakeMetastoreCatalog
TYPE Iceberg
SOURCE BIGLAKE_METASTORE
WITH {
- "warehouse": "s3://cbas-iceberg-playground/my-glue-catalog/warehouse/",
+ "warehouse":
"s3://cbas-iceberg-playground/my-biglake-metastore-catalog/warehouse/",
"namespace": "pharmacy_namespace",
"accessKeyId" : "myAccessKeyId",
"secretAccessKey" : "mySecretAccessKey",
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
index 6a3df32f8c..29aa4a56fc 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/exceptions/ErrorCode.java
@@ -340,6 +340,9 @@ public enum ErrorCode implements IError {
UPDATE_ATTEMPT_ON_CONFLICTING_PATHS(1232),
UPDATE_SET_EXPR_MISSING_ON_ROOT_PATH(1233),
UPDATE_TARGET_NOT_LIST(1234),
+ ICEBERG_NAMESPACE_DOES_NOT_EXIST(1235),
+ ICEBERG_TABLE_DOES_NOT_EXIST(1236),
+ UNSUPPORTED_ICEBERG_DATA_FORMAT(1237),
// Feed errors
DATAFLOW_ILLEGAL_STATE(3001),
diff --git
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
index ecd78724e9..07a1442d1b 100644
---
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
+++
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/metadata/IMetadataLockUtil.java
@@ -52,7 +52,7 @@ public interface IMetadataLockUtil {
String metaItemTypeDatabase, DataverseName
metaItemTypeDataverseName, String metaItemTypeName,
boolean metaItemTypeAnonymous, String nodeGroupName, String
compactionPolicyName,
boolean isDefaultCompactionPolicy, DatasetConfig.DatasetType
datasetType, Object datasetDetails,
- IMetadataProvider metadataProvider) throws AlgebricksException;
+ IMetadataProvider metadataProvider, String catalogName) throws
AlgebricksException;
void dropDatasetBegin(IMetadataLockManager lockManager, LockList locks,
String database,
DataverseName dataverseName, String datasetName) throws
AlgebricksException;
diff --git
a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
index 5a2f5439fe..607eb0b576 100644
--- a/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
+++ b/asterixdb/asterix-common/src/main/resources/asx_errormsg/en.properties
@@ -342,6 +342,9 @@
1232 = Cannot update both parent and child fields in the same UPDATE statement
1233 = UPDATE statement cannot set entire record to null. Use DELETE statement
to remove records
1234 = Invalid update target, expected a list, but found: %s
+1235 = The provided Iceberg namespace '%1$s' does not exist
+1236 = The provided Iceberg table '%1$s' does not exist
+1237 = The provided data format '%1$s' is not supported for Iceberg tables
# Feed Errors
3001 = Illegal state.
diff --git a/asterixdb/asterix-external-data/pom.xml
b/asterixdb/asterix-external-data/pom.xml
index 5105e19346..c9ff38d2a4 100644
--- a/asterixdb/asterix-external-data/pom.xml
+++ b/asterixdb/asterix-external-data/pom.xml
@@ -479,6 +479,14 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
</dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>dynamodb</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>software.amazon.awssdk</groupId>
+ <artifactId>kms</artifactId>
+ </dependency>
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob</artifactId>
@@ -628,6 +636,19 @@
<artifactId>hyracks-cloud</artifactId>
<version>0.3.10-SNAPSHOT</version>
</dependency>
+ <!-- Iceberg -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-aws</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ </dependency>
</dependencies>
<!-- apply patch for HADOOP-17225 to workaround CVE-2019-10172 -->
<repositories>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
index ecc7833e5d..078c7ca49f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/adapter/factory/GenericAdapterFactory.java
@@ -31,11 +31,13 @@ import org.apache.asterix.common.metadata.DataverseName;
import org.apache.asterix.external.api.IDataFlowController;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.api.IExternalDataSourceFactory;
+import org.apache.asterix.external.api.IIcebergRecordReaderFactory;
import org.apache.asterix.external.api.ITypedAdapterFactory;
import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
import org.apache.asterix.external.dataset.adapter.FeedAdapter;
import org.apache.asterix.external.dataset.adapter.GenericAdapter;
import
org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.parser.factory.IcebergParserFactory;
import org.apache.asterix.external.provider.DataflowControllerProvider;
import org.apache.asterix.external.provider.DatasourceFactoryProvider;
import org.apache.asterix.external.provider.ParserFactoryProvider;
@@ -46,6 +48,7 @@ import org.apache.asterix.external.util.FeedLogManager;
import org.apache.asterix.external.util.FeedUtils;
import org.apache.asterix.external.util.IFeedLogManager;
import org.apache.asterix.external.util.NoOpFeedLogManager;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.asterix.om.types.ARecordType;
import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
@@ -144,6 +147,7 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
dataSourceFactory.configure(serviceContext, configuration,
warningCollector, filterEvaluatorFactory);
ExternalDataUtils.validateDataParserParameters(configuration);
dataParserFactory = createDataParserFactory(configuration);
+ setProjectedSchemaIfIcebergTable(configuration, dataSourceFactory,
dataParserFactory);
dataParserFactory.setRecordType(recordType);
dataParserFactory.setMetaType(metaType);
dataParserFactory.configure(configuration);
@@ -152,6 +156,15 @@ public class GenericAdapterFactory implements
ITypedAdapterFactory {
nullifyExternalObjects();
}
+ private void setProjectedSchemaIfIcebergTable(Map<String, String>
configuration,
+ IExternalDataSourceFactory dataSourceFactory, IDataParserFactory
dataParserFactory) {
+ if (IcebergUtils.isIcebergTable(configuration)) {
+ IIcebergRecordReaderFactory readerFactory =
(IIcebergRecordReaderFactory) dataSourceFactory;
+ IcebergParserFactory parserFactory = (IcebergParserFactory)
dataParserFactory;
+
parserFactory.setProjectedSchema(readerFactory.getProjectedSchema());
+ }
+ }
+
private void configureFeedLogManager(ICcApplicationContext appCtx)
throws HyracksDataException, AlgebricksException {
this.isFeed = ExternalDataUtils.isFeed(configuration);
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
similarity index 63%
copy from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
index c5b5da3f36..6d1b1c2754 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/api/IIcebergRecordReaderFactory.java
@@ -16,15 +16,11 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.util.iceberg;
+package org.apache.asterix.external.api;
-public class IcebergConstants {
- private IcebergConstants() {
- throw new AssertionError("do not instantiate");
- }
+import org.apache.iceberg.Schema;
- public static final String ICEBERG_TABLE_FORMAT = "iceberg";
- public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
- public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
- public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+public interface IIcebergRecordReaderFactory<T> extends
IRecordReaderFactory<T> {
+
+ Schema getProjectedSchema();
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
new file mode 100644
index 0000000000..5f465b6fca
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/awsclient/EnsureCloseAWSClientFactory.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.awsclient;
+
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.Method;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.external.util.aws.AwsUtils;
+import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.aws.glue.GlueUtils;
+import org.apache.asterix.external.util.aws.s3.S3Utils;
+import org.apache.iceberg.aws.AwsClientFactory;
+
+import software.amazon.awssdk.services.glue.GlueClient;
+import software.amazon.awssdk.services.kms.KmsClient;
+import software.amazon.awssdk.services.s3.S3AsyncClient;
+import software.amazon.awssdk.services.s3.S3Client;
+
+/**
+ * Custom AWS client factory that ensures assume-role resources (STS client +
credentials provider)
+ * are closed when the Glue catalog is closed. Achieved by returning a proxy
GlueClient with a
+ * close() method that performs the extra cleanup.
+ */
+public class EnsureCloseAWSClientFactory implements AwsClientFactory {
+ private static final long serialVersionUID = 1L;
+
+ private Map<String, String> properties;
+ private CloseableAwsClients s3Clients;
+ private CloseableAwsClients glueClients;
+ private GlueClient proxyGlueClient; // cached proxy
+
+ @Override
+ public void initialize(Map<String, String> properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public S3Client s3() {
+ ensureS3Clients();
+ return (S3Client) s3Clients.getConsumingClient();
+ }
+
+ @Override
+ public S3AsyncClient s3Async() {
+ return null;
+ }
+
+ @Override
+ public GlueClient glue() {
+ ensureGlueClients();
+ return (GlueClient) glueClients.getConsumingClient();
+ }
+
+ @Override
+ public KmsClient kms() {
+ return null;
+ }
+
+ @Override
+ public software.amazon.awssdk.services.dynamodb.DynamoDbClient dynamo() {
+ return null;
+ }
+
+ private void ensureS3Clients() {
+ if (s3Clients == null) {
+ try {
+ s3Clients = S3Utils.buildClient(null, properties);
+ } catch (CompilationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ private void ensureGlueClients() {
+ if (glueClients == null) {
+ try {
+ glueClients = GlueUtils.buildClient(null, properties);
+ } catch (CompilationException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * Invocation handler that intercepts close() to also close STS +
credentials provider.
+ */
+ private class GlueClientInvocationHandler implements InvocationHandler {
+ private final GlueClient delegate;
+ private volatile boolean closed = false;
+
+ GlueClientInvocationHandler(GlueClient delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
+ String name = method.getName();
+ if ("close".equals(name)) {
+ if (!closed) {
+ closed = true;
+ try {
+ delegate.close();
+ } finally {
+ // Close both sets; no-op if not built or already
closed
+ AwsUtils.closeClients(glueClients);
+ AwsUtils.closeClients(s3Clients);
+ }
+ }
+ return null;
+ }
+ return method.invoke(delegate, args);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..da40847b4e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/IcebergTableFilterEvaluatorFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.iceberg.expressions.Expression;
+
+public class IcebergTableFilterEvaluatorFactory implements
IExternalFilterEvaluatorFactory {
+ private static final long serialVersionUID = 1L;
+ private final Expression filterExpression;
+
+ public IcebergTableFilterEvaluatorFactory(Expression expression) {
+ this.filterExpression = expression;
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector)
+ throws HyracksDataException {
+ return null;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector
warningCollector) {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+ public Expression getFilterExpression() {
+ return filterExpression;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
new file mode 100644
index 0000000000..0a18c2ac9f
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/filter/NoOpIcebergTableFilterEvaluatorFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.filter;
+
+import org.apache.asterix.common.external.IExternalFilterEvaluator;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+
+public class NoOpIcebergTableFilterEvaluatorFactory extends
IcebergTableFilterEvaluatorFactory {
+ public static final IExternalFilterEvaluatorFactory INSTANCE = new
NoOpIcebergTableFilterEvaluatorFactory();
+ private static final long serialVersionUID = 1L;
+
+ private NoOpIcebergTableFilterEvaluatorFactory() {
+ super(null);
+ }
+
+ @Override
+ public IExternalFilterEvaluator create(IServiceContext serviceContext,
IWarningCollector warningCollector) {
+ return NoOpExternalFilterEvaluator.INSTANCE;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public IExternalFilterValueEmbedder createValueEmbedder(IWarningCollector
warningCollector) {
+ return NoOpFilterValueEmbedder.INSTANCE;
+ }
+
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
new file mode 100644
index 0000000000..9f779b8c7e
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergFileRecordReader.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.dataflow.AbstractFeedDataFlowController;
+import org.apache.asterix.external.input.record.GenericRecord;
+import org.apache.asterix.external.util.IFeedLogManager;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.parquet.Parquet;
+
+/**
+ * Iceberg record reader.
+ * The reader returns records in Iceberg Record format.
+ */
+public class IcebergFileRecordReader implements IRecordReader<Record> {
+
+ private final List<FileScanTask> fileScanTasks;
+ private final Schema projectedSchema;
+ private final Map<String, String> configuration;
+ private final IRawRecord<Record> record;
+
+ private int nextTaskIndex = 0;
+ private Catalog catalog;
+ private FileIO tableFileIo;
+ private CloseableIterable<Record> iterable;
+ private Iterator<Record> recordsIterator;
+
+ public IcebergFileRecordReader(List<FileScanTask> fileScanTasks, Schema
projectedSchema,
+ Map<String, String> configuration) throws HyracksDataException {
+ this.fileScanTasks = fileScanTasks;
+ this.projectedSchema = projectedSchema;
+ this.configuration = configuration;
+ this.record = new GenericRecord<>();
+
+ try {
+ initializeTable();
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ private void initializeTable() throws CompilationException {
+ if (fileScanTasks.isEmpty()) {
+ return;
+ }
+
+ String namespace = IcebergUtils.getNamespace(configuration);
+ String tableName =
configuration.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+ catalog =
IcebergUtils.initializeCatalog(IcebergUtils.filterCatalogProperties(configuration),
namespace);
+ TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of(namespace), tableName);
+ if (!catalog.tableExists(tableIdentifier)) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_TABLE_DOES_NOT_EXIST, tableName);
+ }
+ Table table = catalog.loadTable(tableIdentifier);
+ tableFileIo = table.io();
+ }
+
+ public boolean hasNext() throws Exception {
+ // iterator has more records
+ if (recordsIterator != null && recordsIterator.hasNext()) {
+ return true;
+ }
+
+ // go to next task
+ // if a file is empty, we will go to the next task
+ while (nextTaskIndex < fileScanTasks.size()) {
+
+ // close previous iterable
+ if (iterable != null) {
+ iterable.close();
+ iterable = null;
+ }
+
+ // Load next task
+ setNextRecordsIterator();
+
+ // if the new iterator has rows → good
+ if (recordsIterator != null && recordsIterator.hasNext()) {
+ return true;
+ }
+
+ // else: this task is empty → continue the loop to the next task
+ }
+
+ // no more tasks & no more rows
+ return false;
+ }
+
+ @Override
+ public IRawRecord<Record> next() throws IOException, InterruptedException {
+ Record icebergRecord = recordsIterator.next();
+ record.set(icebergRecord);
+ return record;
+ }
+
+ @Override
+ public boolean stop() {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (tableFileIo != null) {
+ tableFileIo.close();
+ }
+ if (iterable != null) {
+ iterable.close();
+ }
+
+ try {
+ IcebergUtils.closeCatalog(catalog);
+ } catch (CompilationException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public void setController(AbstractFeedDataFlowController controller) {
+
+ }
+
+ @Override
+ public void setFeedLogManager(IFeedLogManager feedLogManager) throws
HyracksDataException {
+
+ }
+
+ @Override
+ public boolean handleException(Throwable th) {
+ return false;
+ }
+
+ private void setNextRecordsIterator() {
+ FileScanTask task = fileScanTasks.get(nextTaskIndex++);
+ InputFile inFile = tableFileIo.newInputFile(task.file().location());
+ iterable =
Parquet.read(inFile).project(projectedSchema).split(task.start(), task.length())
+ .createReaderFunc(fs ->
GenericParquetReaders.buildReader(projectedSchema, fs)).build();
+ recordsIterator = iterable.iterator();
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
new file mode 100644
index 0000000000..f9af1d153f
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/IcebergParquetRecordReaderFactory.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg;
+
+import static
org.apache.asterix.external.util.iceberg.IcebergUtils.getProjectedFields;
+import static
org.apache.asterix.external.util.iceberg.IcebergUtils.setSnapshot;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+
+import org.apache.asterix.common.dataflow.ICcApplicationContext;
+import org.apache.asterix.common.exceptions.AsterixException;
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IIcebergRecordReaderFactory;
+import org.apache.asterix.external.api.IRecordReader;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
+import
org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint;
+import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
+import org.apache.hyracks.api.application.IServiceContext;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.IWarningCollector;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.io.CloseableIterable;
+
+public class IcebergParquetRecordReaderFactory implements
IIcebergRecordReaderFactory<Record> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<String> RECORD_READER_NAMES =
+
Collections.singletonList(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3);
+
+ private final List<FileScanTask> fileScanTasks = new ArrayList<>();
+ private final List<PartitionWorkLoadBasedOnSize>
partitionWorkLoadsBasedOnSize = new ArrayList<>();
+
+ private Schema projectedSchema;
+ private Map<String, String> configurationCopy;
+
+ private transient AlgebricksAbsolutePartitionConstraint
partitionConstraint;
+
+ public IcebergParquetRecordReaderFactory() {
+ }
+
+ @Override
+ public Class<?> getRecordClass() throws AsterixException {
+ return Record.class;
+ }
+
+ @Override
+ public List<String> getRecordReaderNames() {
+ return RECORD_READER_NAMES;
+ }
+
+ @Override
+ public AlgebricksAbsolutePartitionConstraint getPartitionConstraint() {
+ return partitionConstraint;
+ }
+
+ private int getPartitionsCount() {
+ return getPartitionConstraint().getLocations().length;
+ }
+
+ @Override
+ public IRecordReader<Record>
createRecordReader(IExternalDataRuntimeContext context) throws
HyracksDataException {
+ try {
+ int partition = context.getPartition();
+ return new
IcebergFileRecordReader(partitionWorkLoadsBasedOnSize.get(partition).getFileScanTasks(),
+ projectedSchema, new HashMap<>(configurationCopy));
+ } catch (Exception e) {
+ throw HyracksDataException.create(e);
+ }
+ }
+
+ @Override
+ public Set<String> getReaderSupportedFormats() {
+ return Collections.singleton(IcebergConstants.ICEBERG_PARQUET_FORMAT);
+ }
+
+ @Override
+ public void configure(IServiceContext ctx, Map<String, String>
configuration, IWarningCollector warningCollector,
+ IExternalFilterEvaluatorFactory filterEvaluatorFactory) throws
AlgebricksException, HyracksDataException {
+ this.configurationCopy = new HashMap<>(configuration);
+ this.partitionConstraint = ((ICcApplicationContext)
ctx.getApplicationContext()).getDataPartitioningProvider()
+ .getClusterLocations();
+
+ Catalog catalog = null;
+ Throwable throwable = null;
+ try {
+ String namespace = IcebergUtils.getNamespace(configuration);
+ String tableName =
configuration.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+
+ catalog =
IcebergUtils.initializeCatalog(IcebergUtils.filterCatalogProperties(configuration),
namespace);
+ TableIdentifier tableIdentifier =
TableIdentifier.of(Namespace.of(namespace), tableName);
+ if (!catalog.tableExists(tableIdentifier)) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_TABLE_DOES_NOT_EXIST, tableName);
+ }
+
+ Table table = catalog.loadTable(tableIdentifier);
+ TableScan scan = table.newScan();
+ scan = setSnapshot(configuration, scan);
+ String[] projectedFields = getProjectedFields(configuration);
+ projectedSchema = table.schema();
+ if (projectedFields != null && projectedFields.length > 0) {
+ projectedSchema = projectedSchema.select(projectedFields);
+ }
+ scan.project(projectedSchema);
+ try (CloseableIterable<FileScanTask> tasks = scan.planFiles()) {
+ tasks.forEach(fileScanTasks::add);
+ }
+ distributeWorkLoad(fileScanTasks, getPartitionsCount());
+ } catch (CompilationException ex) {
+ throwable = ex;
+ throw ex;
+ } catch (Exception ex) {
+ throwable = ex;
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, ex.getMessage());
+ } finally {
+ try {
+ IcebergUtils.closeCatalog(catalog);
+ } catch (Exception ex) {
+ if (throwable != null) {
+ throwable.addSuppressed(ex);
+ } else {
+ throw
CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR, ex,
ex.getMessage());
+ }
+ }
+ }
+ }
+
+ private void distributeWorkLoad(List<FileScanTask> fileScanTasks, int
partitionsCount) {
+ PriorityQueue<PartitionWorkLoadBasedOnSize> workloadQueue = new
PriorityQueue<>(partitionsCount,
+
Comparator.comparingLong(PartitionWorkLoadBasedOnSize::getTotalSize));
+
+ // Prepare the workloads based on the number of partitions
+ for (int i = 0; i < partitionsCount; i++) {
+ workloadQueue.add(new PartitionWorkLoadBasedOnSize());
+ }
+
+ for (FileScanTask fileScanTask : fileScanTasks) {
+ PartitionWorkLoadBasedOnSize workload = workloadQueue.poll();
+ workload.addFileScanTask(fileScanTask, fileScanTask.length());
+ workloadQueue.add(workload);
+ }
+ partitionWorkLoadsBasedOnSize.addAll(workloadQueue);
+ }
+
+ @Override
+ public Schema getProjectedSchema() {
+ return projectedSchema;
+ }
+
+ public static class PartitionWorkLoadBasedOnSize implements Serializable {
+ private static final long serialVersionUID = 3L;
+ private final List<FileScanTask> fileScanTasks = new ArrayList<>();
+ private long totalSize = 0;
+
+ public PartitionWorkLoadBasedOnSize() {
+ }
+
+ public List<FileScanTask> getFileScanTasks() {
+ return fileScanTasks;
+ }
+
+ public void addFileScanTask(FileScanTask task, long size) {
+ this.fileScanTasks.add(task);
+ this.totalSize += size;
+ }
+
+ public long getTotalSize() {
+ return totalSize;
+ }
+
+ @Override
+ public String toString() {
+ return "PartitionWorkLoadBasedOnSize{" + "fileScanTasks=" +
fileScanTasks + ", totalSize=" + totalSize
+ + '}';
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
new file mode 100644
index 0000000000..aee5616f19
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/iceberg/converter/IcebergConverterContext.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.iceberg.converter;
+
+import java.io.DataOutput;
+import java.util.List;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+
+public class IcebergConverterContext extends ParserContext {
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADate> dateSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+ @SuppressWarnings("unchecked")
+ private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+ private final boolean decimalToDouble;
+ private final boolean timestampAsLong;
+ private final boolean dateAsInt;
+
+ private final int timeZoneOffset;
+ private final AMutableDate mutableDate = new AMutableDate(0);
+ private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+ private final List<Warning> warnings;
+
+ public IcebergConverterContext(Map<String, String> configuration,
List<Warning> warnings) {
+ this.warnings = warnings;
+ decimalToDouble = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.IcebergOptions.DECIMAL_TO_DOUBLE,
ExternalDataConstants.FALSE));
+ timestampAsLong = Boolean.parseBoolean(configuration
+
.getOrDefault(ExternalDataConstants.IcebergOptions.TIMESTAMP_AS_LONG,
ExternalDataConstants.TRUE));
+ dateAsInt =
Boolean.parseBoolean(configuration.getOrDefault(ExternalDataConstants.IcebergOptions.DATE_AS_INT,
+ ExternalDataConstants.TRUE));
+ String configuredTimeZoneId =
configuration.get(ExternalDataConstants.IcebergOptions.TIMEZONE);
+ if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+ timeZoneOffset =
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+ } else {
+ timeZoneOffset = 0;
+ }
+ }
+
+ public void serializeDate(int value, DataOutput output) {
+ try {
+ mutableDate.setValue(value);
+ dateSerDer.serialize(mutableDate, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public void serializeDateTime(long timestamp, DataOutput output) {
+ try {
+ mutableDateTime.setValue(timestamp);
+ datetimeSerDer.serialize(mutableDateTime, output);
+ } catch (HyracksDataException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public boolean isDecimalToDoubleEnabled() {
+ return decimalToDouble;
+ }
+
+ public int getTimeZoneOffset() {
+ return timeZoneOffset;
+ }
+
+ public boolean isTimestampAsLong() {
+ return timestampAsLong;
+ }
+
+ public boolean isDateAsInt() {
+ return dateAsInt;
+ }
+
+ public List<Warning> getWarnings() {
+ return warnings;
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
new file mode 100644
index 0000000000..634b44d606
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser;
+
+import static
org.apache.asterix.om.pointables.base.DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE;
+import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
+
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.builders.IARecordBuilder;
+import org.apache.asterix.builders.IAsterixListBuilder;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.common.exceptions.RuntimeDataException;
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRawRecord;
+import org.apache.asterix.external.api.IRecordDataParser;
+import
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
+import
org.apache.asterix.external.input.record.reader.aws.iceberg.converter.IcebergConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.om.base.ABoolean;
+import org.apache.asterix.om.base.ANull;
+import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
+import org.apache.asterix.om.types.ATypeTag;
+import org.apache.avro.AvroRuntimeException;
+import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+import org.apache.hyracks.api.exceptions.Warning;
+import org.apache.hyracks.data.std.api.IMutableValueStorage;
+import org.apache.hyracks.data.std.api.IValueReference;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types;
+import org.apache.iceberg.types.Types.NestedField;
+
+public class IcebergParquetDataParser extends AbstractDataParser implements
IRecordDataParser<Record> {
+ private final IcebergConverterContext parserContext;
+ private final IExternalFilterValueEmbedder valueEmbedder;
+ private final Schema projectedSchema;
+
+ public IcebergParquetDataParser(IExternalDataRuntimeContext context,
Map<String, String> conf,
+ Schema projectedSchema) {
+ List<Warning> warnings = new ArrayList<>();
+ parserContext = new IcebergConverterContext(conf, warnings);
+ valueEmbedder = context.getValueEmbedder();
+ this.projectedSchema = projectedSchema;
+ }
+
+ @Override
+ public boolean parse(IRawRecord<? extends Record> record, DataOutput out)
throws HyracksDataException {
+ try {
+ parseObject(record.get(), out);
+ valueEmbedder.reset();
+ return true;
+ } catch (AvroRuntimeException | IOException e) {
+ throw RuntimeDataException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
e, getMessageOrToString(e));
+ }
+ }
+
+ private void parseObject(Record record, DataOutput out) throws IOException
{
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder =
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ valueEmbedder.enterObject();
+ for (int i = 0; i < projectedSchema.columns().size(); i++) {
+ NestedField field = projectedSchema.columns().get(i);
+ String fieldName = field.name();
+ Type fieldType = field.type();
+ ATypeTag typeTag = getTypeTag(fieldType, record.get(i) == null,
parserContext);
+ IValueReference value;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldType, record, i, valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+
objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseArray(Type arrayType, boolean isOptional, List<?>
listValues, DataOutput out) throws IOException {
+ if (listValues == null) {
+ nullSerde.serialize(ANull.NULL, out);
+ return;
+ }
+ final IMutableValueStorage valueBuffer =
parserContext.enterCollection();
+ final IAsterixListBuilder arrayBuilder =
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
+ for (int i = 0; i < listValues.size(); i++) {
+ valueBuffer.reset();
+ //parseValue(elementSchema, elements, i,
valueBuffer.getDataOutput());
+ arrayBuilder.addItem(valueBuffer);
+ }
+ arrayBuilder.write(out, true);
+ parserContext.exitCollection(valueBuffer, arrayBuilder);
+ }
+
+ public static ATypeTag getTypeTag(Type type, boolean isNull,
IcebergConverterContext parserContext)
+ throws HyracksDataException {
+ if (isNull) {
+ return ATypeTag.NULL;
+ }
+
+ switch (type.typeId()) {
+ case BOOLEAN:
+ return ATypeTag.BOOLEAN;
+ case INTEGER:
+ case LONG:
+ return ATypeTag.BIGINT;
+ case FLOAT:
+ return ATypeTag.FLOAT;
+ case DOUBLE:
+ return ATypeTag.DOUBLE;
+ case STRING:
+ return ATypeTag.STRING;
+ case UUID:
+ return ATypeTag.UUID;
+ case BINARY:
+ return ATypeTag.BINARY;
+ case DECIMAL:
+ ensureDecimalToDoubleEnabled(type, parserContext);
+ return ATypeTag.DOUBLE;
+ case STRUCT:
+ return ATypeTag.OBJECT;
+ case LIST:
+ return ATypeTag.ARRAY;
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case TIMESTAMP_NANO:
+ case FIXED:
+ case GEOMETRY:
+ case GEOGRAPHY:
+ case MAP:
+ case VARIANT:
+ case UNKNOWN:
+ throw new NotImplementedException();
+ default:
+ throw createUnsupportedException(type);
+
+ }
+ }
+
+ private void parseValue(Type fieldType, Record record, int index,
DataOutput out) throws IOException {
+ Object value = record.get(index);
+ if (value == null) {
+ nullSerde.serialize(ANull.NULL, out);
+ return;
+ }
+
+ switch (fieldType.typeId()) {
+ case BOOLEAN:
+ booleanSerde.serialize((Boolean) value ? ABoolean.TRUE :
ABoolean.FALSE, out);
+ return;
+ case INTEGER:
+ serializeInteger(value, out);
+ return;
+ case LONG:
+ serializeLong(value, out);
+ return;
+ case FLOAT:
+ // TODO: should this be parsed as double?
+ serializeFloat(value, out);
+ return;
+ case DOUBLE:
+ serializeDouble(value, out);
+ return;
+ case STRING:
+ serializeString(value, out);
+ return;
+ case UUID:
+ serializeUuid(value, out);
+ return;
+ case BINARY:
+ serializeBinary(value, out);
+ return;
+ case DECIMAL:
+ ensureDecimalToDoubleEnabled(fieldType, parserContext);
+ serializeDecimal((BigDecimal) value, out);
+ return;
+ case STRUCT:
+ parseObject((Record) value, out);
+ return;
+ case LIST:
+ Types.ListType listType = fieldType.asListType();
+ parseArray(listType.elementType(),
listType.isElementOptional(), (List<?>) value, out);
+ return;
+ case DATE:
+ case TIME:
+ case TIMESTAMP:
+ case TIMESTAMP_NANO:
+ case FIXED:
+ case GEOMETRY:
+ case GEOGRAPHY:
+ case MAP:
+ case VARIANT:
+ case UNKNOWN:
+ throw new NotImplementedException();
+
+ }
+ }
+
+ private void serializeInteger(Object value, DataOutput out) throws
HyracksDataException {
+ int intValue = (Integer) value;
+ aInt64.setValue(intValue);
+ int64Serde.serialize(aInt64, out);
+ }
+
+ private void serializeLong(Object value, DataOutput out) throws
HyracksDataException {
+ long longValue = (Long) value;
+ aInt64.setValue(longValue);
+ int64Serde.serialize(aInt64, out);
+ }
+
+ private void serializeFloat(Object value, DataOutput out) throws
HyracksDataException {
+ float floatValue = (Float) value;
+ aFloat.setValue(floatValue);
+ floatSerde.serialize(aFloat, out);
+ }
+
+ private void serializeDouble(Object value, DataOutput out) throws
HyracksDataException {
+ double doubleValue = (Double) value;
+ aDouble.setValue(doubleValue);
+ doubleSerde.serialize(aDouble, out);
+ }
+
+ private void serializeUuid(Object value, DataOutput out) throws
HyracksDataException {
+ UUID uuid = (UUID) value;
+ String uuidValue = uuid.toString();
+ char[] buffer = uuidValue.toCharArray();
+ aUUID.parseUUIDString(buffer, 0, uuidValue.length());
+ uuidSerde.serialize(aUUID, out);
+ }
+
+ private void serializeString(Object value, DataOutput out) throws
HyracksDataException {
+ aString.setValue(value.toString());
+ stringSerde.serialize(aString, out);
+ }
+
+ private void serializeDecimal(BigDecimal value, DataOutput out) throws
HyracksDataException {
+ serializeDouble(value.doubleValue(), out);
+ }
+
+ private void serializeBinary(Object value, DataOutput out) throws
HyracksDataException {
+ ByteBuffer byteBuffer = (ByteBuffer) value;
+ aBinary.setValue(byteBuffer.array(), 0, byteBuffer.array().length);
+ binarySerde.serialize(aBinary, out);
+ }
+
+ private static HyracksDataException createUnsupportedException(Type type) {
+ return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg
Parser", type.toString());
+ }
+
+ private static void ensureDecimalToDoubleEnabled(Type type,
IcebergConverterContext context)
+ throws RuntimeDataException {
+ if (!context.isDecimalToDoubleEnabled()) {
+ throw new
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION,
type.toString(),
+ ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
similarity index 63%
copy from
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
copy to
asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
index c5b5da3f36..18041fde6e 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergParserFactory.java
@@ -16,15 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.asterix.external.util.iceberg;
+package org.apache.asterix.external.parser.factory;
-public class IcebergConstants {
- private IcebergConstants() {
- throw new AssertionError("do not instantiate");
- }
+import org.apache.iceberg.Schema;
+
+public abstract class IcebergParserFactory<T> extends
AbstractGenericDataParserFactory<T> {
+ private static final long serialVersionUID = 1L;
- public static final String ICEBERG_TABLE_FORMAT = "iceberg";
- public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
- public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
- public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+ protected Schema projectedSchema;
+
+ public void setProjectedSchema(Schema projectedSchema) {
+ this.projectedSchema = projectedSchema;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
new file mode 100644
index 0000000000..9a2ec65d36
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/IcebergTableParquetDataParserFactory.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.parser.factory;
+
+import java.util.List;
+
+import org.apache.asterix.external.api.IExternalDataRuntimeContext;
+import org.apache.asterix.external.api.IRecordDataParser;
+import org.apache.asterix.external.api.IStreamDataParser;
+import org.apache.asterix.external.parser.IcebergParquetDataParser;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.iceberg.data.Record;
+
+public class IcebergTableParquetDataParserFactory extends
IcebergParserFactory<Record> {
+
+ private static final long serialVersionUID = 1L;
+ private static final List<String> PARSER_FORMAT =
List.of(IcebergConstants.ICEBERG_PARQUET_FORMAT);
+
+ @Override
+ public IStreamDataParser
createInputStreamParser(IExternalDataRuntimeContext context) {
+ throw new UnsupportedOperationException("Stream parser is not
supported");
+ }
+
+ @Override
+ public void setMetaType(ARecordType metaType) {
+ // no MetaType to set.
+ }
+
+ @Override
+ public List<String> getParserFormats() {
+ return PARSER_FORMAT;
+ }
+
+ @Override
+ public IRecordDataParser<Record>
createRecordParser(IExternalDataRuntimeContext context) {
+ return createParser(context);
+ }
+
+ @Override
+ public Class<?> getRecordClass() {
+ return Record.class;
+ }
+
+ private IcebergParquetDataParser createParser(IExternalDataRuntimeContext
context) {
+ return new IcebergParquetDataParser(context, configuration,
projectedSchema);
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
index 0360c15fb1..0f0f8d4661 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/DatasourceFactoryProvider.java
@@ -44,6 +44,7 @@ import
org.apache.asterix.external.input.stream.factory.LocalFSInputStreamFactor
import
org.apache.asterix.external.input.stream.factory.SocketServerInputStreamFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -124,8 +125,10 @@ public class DatasourceFactoryProvider {
Map<String, Class<?>> formatClassMap = factories.get(adaptorName);
String format =
configuration.get(ExternalDataConstants.KEY_FORMAT);
if (isDeltaTable(configuration)) {
- format = configuration.get(ExternalDataConstants.TABLE_FORMAT);
- return getInstance(formatClassMap.getOrDefault(format,
formatClassMap.get(DEFAULT_FORMAT)));
+ return getDeltaInstance(configuration, formatClassMap);
+ }
+ if (IcebergUtils.isIcebergTable(configuration)) {
+ return getIcebergInstance(configuration, formatClassMap);
}
return getInstance(formatClassMap.getOrDefault(format,
formatClassMap.get(DEFAULT_FORMAT)));
}
@@ -197,4 +200,16 @@ public class DatasourceFactoryProvider {
throw new
AsterixException(ErrorCode.PROVIDER_DATASOURCE_FACTORY_DUPLICATE_FORMAT_MAPPING,
key);
}
}
+
+ private static IRecordReaderFactory<?> getDeltaInstance(Map<String,
String> configuration,
+ Map<String, Class<?>> formatClassMap) throws AsterixException {
+ String format = configuration.get(ExternalDataConstants.TABLE_FORMAT);
+ return getInstance(formatClassMap.getOrDefault(format,
formatClassMap.get(DEFAULT_FORMAT)));
+ }
+
+ private static IRecordReaderFactory<?> getIcebergInstance(Map<String,
String> configuration,
+ Map<String, Class<?>> formatClassMap) throws AsterixException {
+ String format = IcebergUtils.getIcebergFormat(configuration);
+ return getInstance(formatClassMap.getOrDefault(format,
formatClassMap.get(DEFAULT_FORMAT)));
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
index 4c10339d99..afa799e782 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/provider/ParserFactoryProvider.java
@@ -32,6 +32,8 @@ import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.external.api.IDataParserFactory;
import org.apache.asterix.external.util.ExternalDataConstants;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.commons.io.IOUtils;
import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
@@ -51,7 +53,12 @@ public class ParserFactoryProvider {
// ExternalDataUtils.getDataverse(configuration),
parserFactoryName);
throw new NotImplementedException();
} else {
- String parserFactoryKey =
ExternalDataUtils.getParserFactory(configuration);
+ String parserFactoryKey;
+ if (IcebergUtils.isIcebergTable(configuration)) {
+ parserFactoryKey = IcebergConstants.ICEBERG_PARQUET_FORMAT;
+ } else {
+ parserFactoryKey =
ExternalDataUtils.getParserFactory(configuration);
+ }
parserFactory =
ParserFactoryProvider.getDataParserFactory(parserFactoryKey);
}
return parserFactory;
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index 2846cf1342..9b270746ac 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -255,6 +255,7 @@ public class ExternalDataConstants {
public static final String DUMMY_DATABASE_NAME = "dbname";
public static final String DUMMY_TYPE_NAME = "typeName";
public static final String DUMMY_DATAVERSE_NAME = "a.b.c";
+ public static final String FORMAT_ICEBERG = "iceberg";
public static final String FORMAT_APACHE_ICEBERG = "apache-iceberg";
public static final String FORMAT_DELTA = "delta";
public static final Set<String> ALL_FORMATS;
@@ -425,6 +426,16 @@ public class ExternalDataConstants {
public static final String TIMEZONE = "timezone";
}
+ public static class IcebergOptions {
+ private IcebergOptions() {
+ }
+
+ public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+ public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+ public static final String DATE_AS_INT = "date-to-int";
+ public static final String TIMEZONE = "timezone";
+ }
+
public static class ParquetOptions {
private ParquetOptions() {
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index 37946103e5..4c08667d8f 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -41,13 +41,16 @@ import static
org.apache.asterix.external.util.azure.blob.BlobUtils.validateAzur
import static
org.apache.asterix.external.util.azure.datalake.DatalakeUtils.validateAzureDataLakeProperties;
import static
org.apache.asterix.external.util.google.GCSUtils.configureHdfsJobConf;
import static
org.apache.asterix.external.util.google.GCSUtils.validateProperties;
+import static
org.apache.asterix.external.util.iceberg.IcebergUtils.isIcebergTable;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
import static
org.apache.asterix.runtime.evaluators.functions.StringEvaluatorUtils.RESERVED_REGEX_CHARS;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import static org.msgpack.core.MessagePack.Code.ARRAY16;
+import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.ArrayList;
@@ -512,6 +515,9 @@ public class ExternalDataUtils {
configuration.put(ExternalDataConstants.KEY_PARSER,
ExternalDataConstants.FORMAT_DELTA);
configuration.put(ExternalDataConstants.KEY_FORMAT,
ExternalDataConstants.FORMAT_DELTA);
}
+ if (isIcebergTable(configuration)) {
+ return;
+ }
prepareTableFormat(configuration);
}
}
@@ -613,7 +619,6 @@ public class ExternalDataUtils {
// If the table is in S3
if
(configuration.get(ExternalDataConstants.KEY_READER).equals(ExternalDataConstants.KEY_ADAPTER_NAME_AWS_S3))
{
-
conf.set(S3Constants.HADOOP_ACCESS_KEY_ID,
configuration.get(AwsConstants.ACCESS_KEY_ID_FIELD_NAME));
conf.set(S3Constants.HADOOP_SECRET_ACCESS_KEY,
configuration.get(AwsConstants.SECRET_ACCESS_KEY_FIELD_NAME));
@@ -1001,8 +1006,8 @@ public class ExternalDataUtils {
}
public static boolean supportsPushdown(Map<String, String> properties) {
- //Currently, only Apache Parquet/Delta table format is supported
- return isParquetFormat(properties) || isDeltaTable(properties);
+ //Currently, only Apache Parquet/Delta/Iceberg table format is
supported
+ return isParquetFormat(properties) || isDeltaTable(properties) ||
isIcebergTable(properties);
}
/**
@@ -1252,4 +1257,30 @@ public class ExternalDataUtils {
}
return properties;
}
+
+ public static ARecordType getExpectedType(String encodedRequestedFields)
throws IOException {
+ if (ALL_FIELDS_TYPE.getTypeName().equals(encodedRequestedFields)) {
+ //By default, return the entire records
+ return ALL_FIELDS_TYPE;
+ } else if (EMPTY_TYPE.getTypeName().equals(encodedRequestedFields)) {
+ //No fields were requested
+ return EMPTY_TYPE;
+ }
+ //A subset of the fields was requested
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] typeBytes = decoder.decode(encodedRequestedFields);
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(typeBytes));
+ return
ExternalDatasetProjectionFiltrationInfo.createTypeField(dataInputStream);
+ }
+
+ public static Map<String, FunctionCallInformation>
getFunctionCallInformationMap(
+ String encodedFunctionCallInformation) throws IOException {
+ if (encodedFunctionCallInformation != null &&
!encodedFunctionCallInformation.isEmpty()) {
+ Base64.Decoder decoder = Base64.getDecoder();
+ byte[] functionCallInfoMapBytes =
decoder.decode(encodedFunctionCallInformation);
+ DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(functionCallInfoMapBytes));
+ return
ExternalDatasetProjectionFiltrationInfo.createFunctionCallInformationMap(dataInputStream);
+ }
+ return null;
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
index 7c7e031768..e57b4d3727 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/HDFSUtils.java
@@ -21,7 +21,6 @@ package org.apache.asterix.external.util;
import static
org.apache.asterix.common.exceptions.ErrorCode.REQUIRED_PARAM_IF_PARAM_IS_PRESENT;
import static
org.apache.asterix.external.util.ExternalDataUtils.validateIncludeExclude;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
-import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE;
import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
import java.io.ByteArrayInputStream;
@@ -61,7 +60,6 @@ import
org.apache.asterix.external.input.record.reader.hdfs.parquet.ParquetReadS
import org.apache.asterix.external.input.stream.HDFSInputStream;
import org.apache.asterix.external.util.ExternalDataConstants.ParquetOptions;
import org.apache.asterix.om.types.ARecordType;
-import
org.apache.asterix.runtime.projection.ExternalDatasetProjectionFiltrationInfo;
import org.apache.asterix.runtime.projection.FunctionCallInformation;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
@@ -456,18 +454,7 @@ public class HDFSUtils {
public static ARecordType getExpectedType(Configuration configuration)
throws IOException {
String encoded =
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS, "");
- if (ALL_FIELDS_TYPE.getTypeName().equals(encoded)) {
- //By default, return the entire records
- return ALL_FIELDS_TYPE;
- } else if (EMPTY_TYPE.getTypeName().equals(encoded)) {
- //No fields were requested
- return EMPTY_TYPE;
- }
- //A subset of the fields was requested
- Base64.Decoder decoder = Base64.getDecoder();
- byte[] typeBytes = decoder.decode(encoded);
- DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(typeBytes));
- return
ExternalDatasetProjectionFiltrationInfo.createTypeField(dataInputStream);
+ return ExternalDataUtils.getExpectedType(encoded);
}
public static void setFunctionCallInformationMap(Map<String,
FunctionCallInformation> funcCallInfoMap,
@@ -479,13 +466,7 @@ public class HDFSUtils {
public static Map<String, FunctionCallInformation>
getFunctionCallInformationMap(Configuration conf)
throws IOException {
String encoded =
conf.get(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION,
"");
- if (!encoded.isEmpty()) {
- Base64.Decoder decoder = Base64.getDecoder();
- byte[] functionCallInfoMapBytes = decoder.decode(encoded);
- DataInputStream dataInputStream = new DataInputStream(new
ByteArrayInputStream(functionCallInfoMapBytes));
- return
ExternalDatasetProjectionFiltrationInfo.createFunctionCallInformationMap(dataInputStream);
- }
- return null;
+ return ExternalDataUtils.getFunctionCallInformationMap(encoded);
}
public static void setWarnings(List<Warning> warnings, Configuration conf)
throws IOException {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
index 0ba702fce8..f68136629c 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/glue/GlueUtils.java
@@ -23,12 +23,24 @@ import static
org.apache.asterix.external.util.aws.AwsConstants.SERVICE_END_POIN
import static
org.apache.asterix.external.util.aws.AwsUtils.buildCredentialsProvider;
import static
org.apache.asterix.external.util.aws.AwsUtils.validateAndGetRegion;
+import java.io.IOException;
import java.util.Map;
+import java.util.UUID;
import org.apache.asterix.common.api.IApplicationContext;
import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.awsclient.EnsureCloseAWSClientFactory;
import org.apache.asterix.external.util.aws.AwsUtils;
import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
import software.amazon.awssdk.auth.credentials.AwsCredentialsProvider;
import software.amazon.awssdk.regions.Region;
@@ -36,6 +48,8 @@ import software.amazon.awssdk.services.glue.GlueClient;
import software.amazon.awssdk.services.glue.GlueClientBuilder;
public class GlueUtils {
+ private static final Logger LOGGER = LogManager.getLogger();
+
private GlueUtils() {
throw new AssertionError("do not instantiate");
}
@@ -64,4 +78,38 @@ public class GlueUtils {
awsClients.setConsumingClient(builder.build());
return awsClients;
}
+
+ public static Catalog initializeCatalog(Map<String, String>
catalogProperties, String namespace)
+ throws CompilationException {
+ // using uuid for a custom catalog name, this not the real catalog
name and not used by services
+ // it is used by iceberg internally for logging and scoping in
multi-catalog environment
+ GlueCatalog catalog = new GlueCatalog();
+ try {
+ catalogProperties.put(CatalogProperties.FILE_IO_IMPL,
IcebergConstants.Aws.S3_FILE_IO);
+ catalogProperties.put(AwsProperties.CLIENT_FACTORY,
EnsureCloseAWSClientFactory.class.getName());
+
+ String catalogName = UUID.randomUUID().toString();
+ catalog.initialize(catalogName, catalogProperties);
+ LOGGER.debug("Initialized AWS Glue catalog: {}", catalogName);
+
+ if (namespace != null &&
!catalog.namespaceExists(Namespace.of(namespace))) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_NAMESPACE_DOES_NOT_EXIST,
namespace);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, ex.getMessage());
+ }
+ return catalog;
+ }
+
+ public static void closeCatalog(GlueCatalog catalog) throws
CompilationException {
+ try {
+ if (catalog != null) {
+ catalog.close();
+ }
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, ex.getMessage());
+ }
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 8c81c1b9ff..9608108551 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -85,6 +85,7 @@ import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.external.util.ExternalDataUtils;
import org.apache.asterix.external.util.aws.AwsUtils;
import org.apache.asterix.external.util.aws.AwsUtils.CloseableAwsClients;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -317,6 +318,13 @@ public class S3Utils {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
}
+ // iceberg tables can be created without passing the bucket,
+ // only validate bucket presence if container is passed
+ String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
+ if (IcebergUtils.isIcebergTable(configuration) && container == null) {
+ return;
+ }
+
validateIncludeExclude(configuration);
try {
// TODO(htowaileb): maybe something better, this will check to
ensure type is supported before creation
@@ -330,7 +338,6 @@ public class S3Utils {
S3Client s3Client = (S3Client) awsClients.getConsumingClient();
S3Response response;
boolean useOldApi = false;
- String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
String prefix = getPrefix(configuration);
try {
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
new file mode 100644
index 0000000000..9ebbe149a1
--- /dev/null
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/google/biglake_metastore/BiglakeMetastore.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.util.google.biglake_metastore;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.asterix.common.exceptions.CompilationException;
+import org.apache.asterix.common.exceptions.ErrorCode;
+import org.apache.asterix.external.awsclient.EnsureCloseAWSClientFactory;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.iceberg.CatalogProperties;
+import org.apache.iceberg.aws.AwsProperties;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+public class BiglakeMetastore {
+ private static final Logger LOGGER = LogManager.getLogger();
+
+ private BiglakeMetastore() {
+ throw new AssertionError("do not instantiate");
+ }
+
+ public static Catalog initializeCatalog(Map<String, String>
catalogProperties, String namespace)
+ throws CompilationException {
+ // using uuid for a custom catalog name, this not the real catalog
name and not used by services
+ // it is used by iceberg internally for logging and scoping in
multi-catalog environment
+ GlueCatalog catalog = new GlueCatalog();
+ try {
+ catalogProperties.put(CatalogProperties.FILE_IO_IMPL,
IcebergConstants.Aws.S3_FILE_IO);
+ catalogProperties.put(AwsProperties.CLIENT_FACTORY,
EnsureCloseAWSClientFactory.class.getName());
+
+ String catalogName = UUID.randomUUID().toString();
+ catalog.initialize(catalogName, catalogProperties);
+ LOGGER.debug("Initialized AWS Glue catalog: {}", catalogName);
+
+ if (namespace != null &&
!catalog.namespaceExists(Namespace.of(namespace))) {
+ throw
CompilationException.create(ErrorCode.ICEBERG_NAMESPACE_DOES_NOT_EXIST,
namespace);
+ }
+ } catch (CompilationException ex) {
+ throw ex;
+ } catch (Throwable ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, ex.getMessage());
+ }
+ return catalog;
+ }
+
+ public static void closeCatalog(GlueCatalog catalog) throws
CompilationException {
+ try {
+ if (catalog != null) {
+ catalog.close();
+ }
+ } catch (IOException ex) {
+ throw CompilationException.create(ErrorCode.EXTERNAL_SOURCE_ERROR,
ex, ex.getMessage());
+ }
+ }
+}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
index c5b5da3f36..3921e36225 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergConstants.java
@@ -24,7 +24,20 @@ public class IcebergConstants {
}
public static final String ICEBERG_TABLE_FORMAT = "iceberg";
- public static final String APACHE_ICEBERG_TABLE_FORMAT = "apache-iceberg";
+ public static final String ICEBERG_CATALOG_NAME = "catalogName";
public static final String ICEBERG_SOURCE_PROPERTY_KEY = "catalogSource";
public static final String ICEBERG_WAREHOUSE_PROPERTY_KEY = "warehouse";
+ public static final String ICEBERG_TABLE_NAME_PROPERTY_KEY = "tableName";
+ public static final String ICEBERG_NAMESPACE_PROPERTY_KEY = "namespace";
+ public static final String ICEBERG_SNAPSHOT_ID_PROPERTY_KEY = "snapshotId";
+ public static final String ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY =
"snapshotTimestamp";
+
+ public static final String ICEBERG_PROPERTY_PREFIX_INTERNAL =
"catalog-property#";
+
+ public static final String ICEBERG_PARQUET_FORMAT = "iceberg-parquet";
+ public static final String ICEBERG_AVRO_FORMAT = "iceberg-avro";
+
+ public class Aws {
+ public static final String S3_FILE_IO =
"org.apache.iceberg.aws.s3.S3FileIO";
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 9654ce3877..369b5fe2a6 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -18,14 +18,29 @@
*/
package org.apache.asterix.external.util.iceberg;
+import static
org.apache.asterix.common.exceptions.ErrorCode.UNSUPPORTED_ICEBERG_DATA_FORMAT;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_AVRO_FORMAT;
+import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_PARQUET_FORMAT;
import static
org.apache.asterix.external.util.iceberg.IcebergConstants.ICEBERG_TABLE_FORMAT;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
+import java.util.Optional;
import org.apache.asterix.common.config.CatalogConfig;
+import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.exceptions.CompilationException;
import org.apache.asterix.common.exceptions.ErrorCode;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.aws.glue.GlueUtils;
+import
org.apache.asterix.external.util.google.biglake_metastore.BiglakeMetastore;
+import org.apache.asterix.om.types.ARecordType;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.aws.glue.GlueCatalog;
+import org.apache.iceberg.catalog.Catalog;
public class IcebergUtils {
@@ -87,4 +102,103 @@ public class IcebergUtils {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, key);
}
}
+
+ public static void validateIcebergTableProperties(Map<String, String>
properties) throws CompilationException {
+ String tableName =
properties.get(IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+ if (tableName == null || tableName.isEmpty()) {
+ throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
+ IcebergConstants.ICEBERG_TABLE_NAME_PROPERTY_KEY);
+ }
+ }
+
+ /**
+ * Extracts and returns the iceberg catalog properties from the provided
configuration
+ *
+ * @param configuration configuration
+ * @return catalog properties
+ */
+ public static Map<String, String> filterCatalogProperties(Map<String,
String> configuration) {
+ Map<String, String> catalogProperties = new HashMap<>();
+ Iterator<Map.Entry<String, String>> iterator =
configuration.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, String> entry = iterator.next();
+ if
(entry.getKey().startsWith(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL)) {
+ catalogProperties.put(
+
entry.getKey().substring(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL.length()),
+ entry.getValue());
+ iterator.remove();
+ }
+ }
+ return catalogProperties;
+ }
+
+ /**
+ * Namespace can be null (not passed), or it can be passed for the catalog
or the collection. If it is passed
+ * for both, namespace for the collection will be used, otherwise, the
namespace for the catalog will be used.
+ *
+ * @param configuration configuration
+ * @return namespace
+ */
+ public static String getNamespace(Map<String, String> configuration) {
+ String namespace =
configuration.get(IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY);
+ if (namespace != null) {
+ return namespace;
+ }
+
+ String catalogNamespaceProperty =
+ IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL +
IcebergConstants.ICEBERG_NAMESPACE_PROPERTY_KEY;
+ namespace = configuration.get(catalogNamespaceProperty);
+ return namespace;
+ }
+
+ public static String getIcebergFormat(Map<String, String> configuration)
throws AsterixException {
+ String format =
configuration.get(ExternalDataConstants.KEY_FORMAT).toLowerCase();
+ return switch (format) {
+ case ExternalDataConstants.FORMAT_PARQUET ->
ICEBERG_PARQUET_FORMAT;
+ case ExternalDataConstants.FORMAT_AVRO -> ICEBERG_AVRO_FORMAT;
+ default -> throw
AsterixException.create(UNSUPPORTED_ICEBERG_DATA_FORMAT, format);
+ };
+ }
+
+ public static Catalog initializeCatalog(Map<String, String>
catalogProperties, String namespace)
+ throws CompilationException {
+ String source =
catalogProperties.get(IcebergConstants.ICEBERG_SOURCE_PROPERTY_KEY);
+ Optional<CatalogConfig.IcebergCatalogSource> catalogSource =
CatalogConfig.getIcebergCatalogSource(source);
+ if (catalogSource.isEmpty()) {
+ throw
CompilationException.create(ErrorCode.UNSUPPORTED_ICEBERG_CATALOG_SOURCE,
source);
+ }
+
+ return switch (catalogSource.get()) {
+ case CatalogConfig.IcebergCatalogSource.AWS_GLUE ->
GlueUtils.initializeCatalog(catalogProperties, namespace);
+ case CatalogConfig.IcebergCatalogSource.BIGLAKE_METASTORE ->
BiglakeMetastore.initializeCatalog(catalogProperties, namespace);
+ case CatalogConfig.IcebergCatalogSource.REST -> null;
+ };
+ }
+
+ public static void closeCatalog(Catalog catalog) throws
CompilationException {
+ if (catalog != null) {
+ if (catalog instanceof GlueCatalog) {
+ GlueUtils.closeCatalog((GlueCatalog) catalog);
+ }
+ }
+ }
+
+ public static TableScan setSnapshot(Map<String, String> configuration,
TableScan scan) {
+ String snapshot =
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_ID_PROPERTY_KEY);
+ if (snapshot != null) {
+ scan = scan.useSnapshot(Long.parseLong(snapshot));
+ } else {
+ String asOfTimestamp =
configuration.get(IcebergConstants.ICEBERG_SNAPSHOT_TIMESTAMP_PROPERTY_KEY);
+ if (asOfTimestamp != null) {
+ scan = scan.asOfTime(Long.parseLong(asOfTimestamp));
+ }
+ }
+ return scan;
+ }
+
+ public static String[] getProjectedFields(Map<String, String>
configuration) throws IOException {
+ String encoded =
configuration.get(ExternalDataConstants.KEY_REQUESTED_FIELDS);
+ ARecordType projectedRecordType =
ExternalDataUtils.getExpectedType(encoded);
+ return projectedRecordType.getFieldNames();
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
index 793516ce03..e9ea9a6682 100644
---
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
+++
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IDataParserFactory
@@ -23,4 +23,5 @@ org.apache.asterix.external.parser.factory.RSSParserFactory
org.apache.asterix.external.parser.factory.TweetParserFactory
org.apache.asterix.external.parser.factory.NoOpDataParserFactory
org.apache.asterix.external.parser.factory.AvroDataParserFactory
-org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
\ No newline at end of file
+org.apache.asterix.external.parser.factory.DeltaTableDataParserFactory
+org.apache.asterix.external.parser.factory.IcebergTableParquetDataParserFactory
\ No newline at end of file
diff --git
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
index 0eef480f2f..e699937321 100644
---
a/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
+++
b/asterixdb/asterix-external-data/src/main/resources/META-INF/services/org.apache.asterix.external.api.IRecordReaderFactory
@@ -21,8 +21,9 @@ org.apache.asterix.external.input.HDFSDataSourceFactory
org.apache.asterix.external.input.record.reader.stream.StreamRecordReaderFactory
org.apache.asterix.external.input.record.reader.http.HttpServerRecordReaderFactory
org.apache.asterix.external.input.record.reader.aws.AwsS3ReaderFactory
-org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
org.apache.asterix.external.input.record.reader.aws.delta.AwsS3DeltaReaderFactory
+org.apache.asterix.external.input.record.reader.aws.iceberg.IcebergParquetRecordReaderFactory
+org.apache.asterix.external.input.record.reader.aws.parquet.AwsS3ParquetReaderFactory
org.apache.asterix.external.input.record.reader.gcs.GCSReaderFactory
org.apache.asterix.external.input.record.reader.azure.blob.AzureBlobReaderFactory
org.apache.asterix.external.input.record.reader.azure.datalake.AzureDataLakeReaderFactory
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 01ce9a98d4..f0a803ae11 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -68,6 +68,8 @@ import
org.apache.asterix.external.operators.ExternalScanOperatorDescriptor;
import org.apache.asterix.external.operators.FeedIntakeOperatorDescriptor;
import org.apache.asterix.external.provider.AdapterFactoryProvider;
import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.external.util.iceberg.IcebergConstants;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.BinaryIntegerInspector;
import org.apache.asterix.formats.nontagged.LinearizeComparatorFactoryProvider;
@@ -87,6 +89,8 @@ import org.apache.asterix.metadata.entities.FeedPolicyEntity;
import org.apache.asterix.metadata.entities.FullTextConfigMetadataEntity;
import org.apache.asterix.metadata.entities.FullTextFilterMetadataEntity;
import org.apache.asterix.metadata.entities.Function;
+import org.apache.asterix.metadata.entities.IcebergCatalog;
+import org.apache.asterix.metadata.entities.IcebergCatalogDetails;
import org.apache.asterix.metadata.entities.Index;
import org.apache.asterix.metadata.entities.Synonym;
import org.apache.asterix.metadata.feeds.FeedMetadataUtil;
@@ -1003,13 +1007,31 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
dataset.getDataverseName().getCanonicalForm());
setExternalEntityId(configuration);
setSourceType(configuration, adapterName);
+
+ // for iceberg table, add catalog properties to the configuration
+ addIcebergCatalogPropertiesIfNeeded(configuration);
return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
configuration, itemType, null, warningCollector,
filterEvaluatorFactory);
+ } catch (AlgebricksException e) {
+ throw e;
} catch (Exception e) {
throw new AlgebricksException("Unable to create adapter", e);
}
}
+ private void addIcebergCatalogPropertiesIfNeeded(Map<String, String>
configuration) throws AlgebricksException {
+ if (IcebergUtils.isIcebergTable(configuration)) {
+ String catalogName =
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
+ IcebergCatalog catalog =
+ (IcebergCatalog)
MetadataManager.INSTANCE.getCatalog(getMetadataTxnContext(), catalogName);
+ IcebergCatalogDetails details = (IcebergCatalogDetails)
catalog.getCatalogDetails();
+ for (Map.Entry<String, String> entry :
details.getProperties().entrySet()) {
+
configuration.putIfAbsent(IcebergConstants.ICEBERG_PROPERTY_PREFIX_INTERNAL +
entry.getKey(),
+ entry.getValue());
+ }
+ }
+ }
+
protected void setSourceType(Map<String, String> configuration, String
adapterName) {
configuration.putIfAbsent(ExternalDataConstants.KEY_EXTERNAL_SOURCE_TYPE,
adapterName);
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
index e637f28308..de309268b8 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/DatasetUtil.java
@@ -53,6 +53,7 @@ import org.apache.asterix.common.metadata.MetadataUtil;
import org.apache.asterix.common.transactions.IRecoveryManager;
import org.apache.asterix.common.utils.JobUtils;
import org.apache.asterix.external.util.ExternalDataUtils;
+import org.apache.asterix.external.util.iceberg.IcebergUtils;
import org.apache.asterix.formats.base.IDataFormat;
import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
import org.apache.asterix.formats.nontagged.TypeTraitProvider;
@@ -879,6 +880,11 @@ public class DatasetUtil {
.isDeltaTable(((ExternalDatasetDetails)
dataset.getDatasetDetails()).getProperties());
}
+ public static boolean isIcebergTable(Dataset dataset) {
+ return dataset.getDatasetType() == DatasetType.EXTERNAL
+ && IcebergUtils.isIcebergTable(((ExternalDatasetDetails)
dataset.getDatasetDetails()).getProperties());
+ }
+
public static boolean isParquetFormat(Dataset dataset) {
return dataset.getDatasetType() == DatasetType.EXTERNAL &&
ExternalDataUtils
.isParquetFormat(((ExternalDatasetDetails)
dataset.getDatasetDetails()).getProperties());
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
index 79db1a0ea4..37cc9d8e5c 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/IndexUtil.java
@@ -20,6 +20,7 @@ package org.apache.asterix.metadata.utils;
import static org.apache.asterix.external.util.ExternalDataUtils.isDeltaTable;
import static
org.apache.asterix.external.util.ExternalDataUtils.isParquetFormat;
+import static
org.apache.asterix.external.util.iceberg.IcebergUtils.isIcebergTable;
import static
org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE;
import static
org.apache.hyracks.storage.am.common.dataflow.IndexDropOperatorDescriptor.DropOption;
@@ -44,6 +45,7 @@ import org.apache.asterix.common.transactions.TxnId;
import org.apache.asterix.external.indexing.ExternalFile;
import
org.apache.asterix.external.input.filter.NoOpDeltaTableFilterEvaluatorFactory;
import
org.apache.asterix.external.input.filter.NoOpExternalFilterEvaluatorFactory;
+import
org.apache.asterix.external.input.filter.NoOpIcebergTableFilterEvaluatorFactory;
import org.apache.asterix.external.input.filter.ParquetFilterEvaluatorFactory;
import org.apache.asterix.external.util.ExternalDataPrefix;
import org.apache.asterix.metadata.dataset.DatasetFormatInfo;
@@ -353,43 +355,73 @@ public class IndexUtil {
IVariableTypeEnvironment typeEnv, IProjectionFiltrationInfo
projectionFiltrationInfo,
Map<String, String> properties) throws AlgebricksException {
if (isDeltaTable(properties)) {
- if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
- return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
- } else {
- DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
- (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
- return builder.build();
- }
- } else if (isParquetFormat(properties)) {
- if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
- return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
- } else {
- ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
- ExternalDatasetProjectionFiltrationInfo pfi =
- (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
- IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory
=
- NoOpExternalFilterEvaluatorFactory.INSTANCE;
- if (!prefix.getPaths().isEmpty()) {
- ExternalFilterBuilder externalFilterBuilder =
- new ExternalFilterBuilder(pfi, context, typeEnv,
prefix);
- externalFilterEvaluatorFactory =
externalFilterBuilder.build();
- }
- ParquetFilterBuilder builder = new ParquetFilterBuilder(
- (ParquetExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
- return new
ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
- builder.buildFilterPredicate());
- }
+ return createDeltaFilter(projectionFiltrationInfo, context,
typeEnv);
+ }
+ if (isIcebergTable(properties)) {
+ return createIcebergFilter(projectionFiltrationInfo, context,
typeEnv);
+ }
+ if (isParquetFormat(properties)) {
+ return createParquetFilter(projectionFiltrationInfo, context,
typeEnv, properties);
+ }
+ return createExternalFilter(projectionFiltrationInfo, context,
typeEnv, properties);
+ }
+
+ private static IExternalFilterEvaluatorFactory
createDeltaFilter(IProjectionFiltrationInfo projectionFiltrationInfo,
+ JobGenContext context, IVariableTypeEnvironment typeEnv) throws
AlgebricksException {
+ if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
} else {
- if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
- return NoOpExternalFilterEvaluatorFactory.INSTANCE;
- } else {
- ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
- ExternalDatasetProjectionFiltrationInfo pfi =
- (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
- ExternalFilterBuilder build = new ExternalFilterBuilder(pfi,
context, typeEnv, prefix);
- return build.build();
+ DeltaTableFilterBuilder builder = new DeltaTableFilterBuilder(
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
+ return builder.build();
+ }
+ }
+
+ private static IExternalFilterEvaluatorFactory createIcebergFilter(
+ IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext
context, IVariableTypeEnvironment typeEnv)
+ throws AlgebricksException {
+ if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE;
+ } else {
+ // IcebergTableFilterBuilder builder = new
IcebergTableFilterBuilder(
+ // (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
+ // return builder.build();
+ return NoOpIcebergTableFilterEvaluatorFactory.INSTANCE;
+ }
+ }
+
+ private static IExternalFilterEvaluatorFactory createParquetFilter(
+ IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext
context, IVariableTypeEnvironment typeEnv,
+ Map<String, String> properties) throws AlgebricksException {
+ if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpDeltaTableFilterEvaluatorFactory.INSTANCE;
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
+ IExternalFilterEvaluatorFactory externalFilterEvaluatorFactory =
+ NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ if (!prefix.getPaths().isEmpty()) {
+ ExternalFilterBuilder externalFilterBuilder = new
ExternalFilterBuilder(pfi, context, typeEnv, prefix);
+ externalFilterEvaluatorFactory = externalFilterBuilder.build();
}
+ ParquetFilterBuilder builder = new ParquetFilterBuilder(
+ (ParquetExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo, context, typeEnv);
+ return new
ParquetFilterEvaluatorFactory(externalFilterEvaluatorFactory,
builder.buildFilterPredicate());
}
}
+ private static IExternalFilterEvaluatorFactory createExternalFilter(
+ IProjectionFiltrationInfo projectionFiltrationInfo, JobGenContext
context, IVariableTypeEnvironment typeEnv,
+ Map<String, String> properties) throws AlgebricksException {
+ if (projectionFiltrationInfo ==
DefaultProjectionFiltrationInfo.INSTANCE) {
+ return NoOpExternalFilterEvaluatorFactory.INSTANCE;
+ } else {
+ ExternalDataPrefix prefix = new ExternalDataPrefix(properties);
+ ExternalDatasetProjectionFiltrationInfo pfi =
+ (ExternalDatasetProjectionFiltrationInfo)
projectionFiltrationInfo;
+ ExternalFilterBuilder build = new ExternalFilterBuilder(pfi,
context, typeEnv, prefix);
+ return build.build();
+ }
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
index 966bfa8cf8..07f62c9a76 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/utils/MetadataLockUtil.java
@@ -70,10 +70,10 @@ public class MetadataLockUtil implements IMetadataLockUtil {
String metaItemTypeDatabase, DataverseName
metaItemTypeDataverseName, String metaItemTypeName,
boolean metaItemTypeAnonymous, String nodeGroupName, String
compactionPolicyName,
boolean isDefaultCompactionPolicy, DatasetConfig.DatasetType
datasetType, Object datasetDetails,
- IMetadataProvider metadataProvider) throws AlgebricksException {
+ IMetadataProvider metadataProvider, String catalogName) throws
AlgebricksException {
createDatasetBeginPre(lockMgr, locks, database, dataverseName,
itemTypeDatabase, itemTypeDataverseName,
itemTypeName, itemTypeAnonymous, metaItemTypeDatabase,
metaItemTypeDataverseName, metaItemTypeName,
- metaItemTypeAnonymous, nodeGroupName, compactionPolicyName,
isDefaultCompactionPolicy);
+ metaItemTypeAnonymous, nodeGroupName, compactionPolicyName,
isDefaultCompactionPolicy, catalogName);
lockMgr.acquireDatasetWriteLock(locks, database, dataverseName,
datasetName);
}
@@ -81,7 +81,7 @@ public class MetadataLockUtil implements IMetadataLockUtil {
DataverseName dataverseName, String itemTypeDatabase,
DataverseName itemTypeDataverseName,
String itemTypeName, boolean itemTypeAnonymous, String
metaItemTypeDatabase,
DataverseName metaItemTypeDataverseName, String metaItemTypeName,
boolean metaItemTypeAnonymous,
- String nodeGroupName, String compactionPolicyName, boolean
isDefaultCompactionPolicy)
+ String nodeGroupName, String compactionPolicyName, boolean
isDefaultCompactionPolicy, String catalogName)
throws AlgebricksException {
lockMgr.acquireDatabaseReadLock(locks, database);
lockMgr.acquireDataverseReadLock(locks, database, dataverseName);
@@ -114,6 +114,9 @@ public class MetadataLockUtil implements IMetadataLockUtil {
if (!isDefaultCompactionPolicy) {
lockMgr.acquireMergePolicyReadLock(locks, compactionPolicyName);
}
+ if (catalogName != null) {
+ lockMgr.acquireCatalogReadLock(locks, catalogName);
+ }
}
@Override
diff --git a/asterixdb/pom.xml b/asterixdb/pom.xml
index 88b3591806..8d6524fb67 100644
--- a/asterixdb/pom.xml
+++ b/asterixdb/pom.xml
@@ -102,7 +102,7 @@
<awsjavasdk.version>2.29.27</awsjavasdk.version>
<awsjavasdk.crt.version>0.33.3</awsjavasdk.crt.version>
- <parquet.version>1.15.2</parquet.version>
+ <parquet.version>1.16.0</parquet.version>
<hadoop-awsjavasdk.version>1.12.788</hadoop-awsjavasdk.version>
<azureblobjavasdk.version>12.31.1</azureblobjavasdk.version>
@@ -118,6 +118,8 @@
<io.opencensus.version>0.31.1</io.opencensus.version>
<protobuf-java.version>3.25.8</protobuf-java.version>
+ <icebergjavasdk.version>1.10.0</icebergjavasdk.version>
+
<implementation.title>Apache AsterixDB -
${project.name}</implementation.title>
<implementation.url>https://asterixdb.apache.org/</implementation.url>
<implementation.version>${project.version}</implementation.version>
@@ -1925,33 +1927,6 @@
<artifactId>avro</artifactId>
<version>1.12.0</version>
</dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-core</artifactId>
- <version>1.5.2</version>
- </dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-data</artifactId>
- <version>1.5.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.iceberg</groupId>
- <artifactId>iceberg-parquet</artifactId>
- <version>1.5.2</version>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
@@ -2001,6 +1976,33 @@
<artifactId>aws-crt</artifactId>
<version>${awsjavasdk.crt.version}</version>
</dependency>
+
+ <!-- Iceberg -->
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-core</artifactId>
+ <version>${icebergjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-api</artifactId>
+ <version>${icebergjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-data</artifactId>
+ <version>${icebergjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-parquet</artifactId>
+ <version>${icebergjavasdk.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.iceberg</groupId>
+ <artifactId>iceberg-aws</artifactId>
+ <version>${icebergjavasdk.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>