This is an automated email from the ASF dual-hosted git repository.

mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 200b2f8c71 [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
200b2f8c71 is described below

commit 200b2f8c710d7b72439d41819681d19c33d3610f
Author: Hussain Towaileb <[email protected]>
AuthorDate: Sun Sep 21 16:46:35 2025 +0300

    [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
    
    Details:
    - Support parquet format by default unless format is provided.
    - All some method to be overridden by extensions.
    - Remove null properties values before init'ing catalog.
    - Disable failing test.
    - Support iceberg complex types + date + time
    
    Ext-ref: MB-63115
    Change-Id: I1726c2168bfec1f137390c7c2112c2df59151dc2
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20648
    Reviewed-by: Hussain Towaileb <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Reviewed-by: Peeyush Gupta <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
---
 .../asterix/app/translator/QueryTranslator.java    |   3 +-
 .../runtimets/testsuite_external_dataset_s3.xml    |   2 +
 .../external/parser/IcebergParquetDataParser.java  | 225 ++++++++++++++-------
 .../asterix/external/util/aws/s3/S3Utils.java      |   5 +-
 .../external/util/iceberg/IcebergUtils.java        |  13 ++
 .../metadata/declared/MetadataProvider.java        |   5 +-
 6 files changed, 169 insertions(+), 84 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0946e4a4a3..3a6274647d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1187,11 +1187,12 @@ public class QueryTranslator extends 
AbstractLangTranslator implements IStatemen
         return Optional.of(dataset);
     }
 
-    private void validateIfIcebergTable(Map<String, String> properties, 
MetadataTransactionContext mdTxnCtx,
+    protected void validateIfIcebergTable(Map<String, String> properties, 
MetadataTransactionContext mdTxnCtx,
             SourceLocation srcLoc) throws AlgebricksException {
         if (!IcebergUtils.isIcebergTable(properties)) {
             return;
         }
+        IcebergUtils.setDefaultFormat(properties);
 
         // ensure the specified catalog exists
         String catalogName = 
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
diff --git 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index c6d6d32035..ae4154ed54 100644
--- 
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++ 
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -1479,12 +1479,14 @@
               <expected-error>ASX1178: Unsupported iceberg 
table</expected-error>
           </compilation-unit>
       </test-case>
+    <!-- old iceberg test, check why failing
       <test-case FilePath="external-dataset/s3">
           <compilation-unit name="iceberg-mixed-data-format">
               <output-dir compare="Text">none</output-dir>
               <expected-error>avro-file.avro. Reason: not a Parquet 
file</expected-error>
           </compilation-unit>
       </test-case>
+      -->
       <test-case FilePath="external-dataset/s3">
         <compilation-unit name="iceberg-empty">
           <output-dir compare="Text">iceberg-empty</output-dir>
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index 634b44d606..5afddc37b7 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -25,6 +25,8 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -45,16 +47,17 @@ import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
 import org.apache.asterix.om.types.ATypeTag;
 import org.apache.avro.AvroRuntimeException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.Warning;
 import org.apache.hyracks.data.std.api.IMutableValueStorage;
 import org.apache.hyracks.data.std.api.IValueReference;
 import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
 import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
 
 public class IcebergParquetDataParser extends AbstractDataParser implements 
IRecordDataParser<Record> {
     private final IcebergConverterContext parserContext;
@@ -72,7 +75,7 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
     @Override
     public boolean parse(IRawRecord<? extends Record> record, DataOutput out) 
throws HyracksDataException {
         try {
-            parseObject(record.get(), out);
+            parseRootObject(record.get(), out);
             valueEmbedder.reset();
             return true;
         } catch (AvroRuntimeException | IOException e) {
@@ -80,7 +83,7 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
         }
     }
 
-    private void parseObject(Record record, DataOutput out) throws IOException 
{
+    private void parseRootObject(Record record, DataOutput out) throws 
IOException {
         IMutableValueStorage valueBuffer = parserContext.enterObject();
         IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
         valueEmbedder.enterObject();
@@ -94,7 +97,7 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
                 value = valueEmbedder.getEmbeddedValue();
             } else {
                 valueBuffer.reset();
-                parseValue(fieldType, record, i, valueBuffer.getDataOutput());
+                parseValue(fieldType, record.get(i), 
valueBuffer.getDataOutput());
                 value = valueBuffer;
             }
 
@@ -110,70 +113,7 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
         parserContext.exitObject(valueBuffer, null, objectBuilder);
     }
 
-    private void parseArray(Type arrayType, boolean isOptional, List<?> 
listValues, DataOutput out) throws IOException {
-        if (listValues == null) {
-            nullSerde.serialize(ANull.NULL, out);
-            return;
-        }
-        final IMutableValueStorage valueBuffer = 
parserContext.enterCollection();
-        final IAsterixListBuilder arrayBuilder = 
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
-        for (int i = 0; i < listValues.size(); i++) {
-            valueBuffer.reset();
-            //parseValue(elementSchema, elements, i, 
valueBuffer.getDataOutput());
-            arrayBuilder.addItem(valueBuffer);
-        }
-        arrayBuilder.write(out, true);
-        parserContext.exitCollection(valueBuffer, arrayBuilder);
-    }
-
-    public static ATypeTag getTypeTag(Type type, boolean isNull, 
IcebergConverterContext parserContext)
-            throws HyracksDataException {
-        if (isNull) {
-            return ATypeTag.NULL;
-        }
-
-        switch (type.typeId()) {
-            case BOOLEAN:
-                return ATypeTag.BOOLEAN;
-            case INTEGER:
-            case LONG:
-                return ATypeTag.BIGINT;
-            case FLOAT:
-                return ATypeTag.FLOAT;
-            case DOUBLE:
-                return ATypeTag.DOUBLE;
-            case STRING:
-                return ATypeTag.STRING;
-            case UUID:
-                return ATypeTag.UUID;
-            case BINARY:
-                return ATypeTag.BINARY;
-            case DECIMAL:
-                ensureDecimalToDoubleEnabled(type, parserContext);
-                return ATypeTag.DOUBLE;
-            case STRUCT:
-                return ATypeTag.OBJECT;
-            case LIST:
-                return ATypeTag.ARRAY;
-            case DATE:
-            case TIME:
-            case TIMESTAMP:
-            case TIMESTAMP_NANO:
-            case FIXED:
-            case GEOMETRY:
-            case GEOGRAPHY:
-            case MAP:
-            case VARIANT:
-            case UNKNOWN:
-                throw new NotImplementedException();
-            default:
-                throw createUnsupportedException(type);
-
-        }
-    }
-
-    private void parseValue(Type fieldType, Record record, int index, 
DataOutput out) throws IOException {
-        Object value = record.get(index);
+    private void parseValue(Type fieldType, Object value, DataOutput out) 
throws IOException {
         if (value == null) {
             nullSerde.serialize(ANull.NULL, out);
             return;
@@ -190,7 +130,6 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
                 serializeLong(value, out);
                 return;
             case FLOAT:
-                // TODO: should this be parsed as double?
                 serializeFloat(value, out);
                 return;
             case DOUBLE:
@@ -202,6 +141,9 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
             case UUID:
                 serializeUuid(value, out);
                 return;
+            case FIXED:
+                serializeFixedBinary(value, out);
+                return;
             case BINARY:
                 serializeBinary(value, out);
                 return;
@@ -209,26 +151,110 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
                 ensureDecimalToDoubleEnabled(fieldType, parserContext);
                 serializeDecimal((BigDecimal) value, out);
                 return;
-            case STRUCT:
-                parseObject((Record) value, out);
-                return;
             case LIST:
                 Types.ListType listType = fieldType.asListType();
-                parseArray(listType.elementType(), 
listType.isElementOptional(), (List<?>) value, out);
+                parseArray(listType, (List<?>) value, out);
+                return;
+            case STRUCT:
+                parseObject((StructType) fieldType, (StructLike) value, out);
+                return;
+            case MAP:
+                Types.MapType mapType = fieldType.asMapType();
+                parseMap(mapType, (Map<?, ?>) value, out);
                 return;
             case DATE:
+                serializeDate(value, out);
+                return;
             case TIME:
+                serializeTime(value, out);
+                return;
             case TIMESTAMP:
             case TIMESTAMP_NANO:
-            case FIXED:
             case GEOMETRY:
             case GEOGRAPHY:
-            case MAP:
             case VARIANT:
             case UNKNOWN:
-                throw new NotImplementedException();
+            default:
+                throw createUnsupportedException(fieldType);
+
+        }
+    }
+
+    private void parseArray(Types.ListType listType, List<?> listValues, 
DataOutput out) throws IOException {
+        if (listValues == null) {
+            nullSerde.serialize(ANull.NULL, out);
+            return;
+        }
 
+        Type elementType = listType.elementType();
+        final IMutableValueStorage valueBuffer = 
parserContext.enterCollection();
+        final IAsterixListBuilder arrayBuilder = 
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
+        for (Object listValue : listValues) {
+            valueBuffer.reset();
+            parseValue(elementType, listValue, valueBuffer.getDataOutput());
+            arrayBuilder.addItem(valueBuffer);
         }
+        arrayBuilder.write(out, true);
+        parserContext.exitCollection(valueBuffer, arrayBuilder);
+    }
+
+    private void parseObject(StructType schema, StructLike structLike, 
DataOutput out) throws IOException {
+        IMutableValueStorage valueBuffer = parserContext.enterObject();
+        IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        valueEmbedder.enterObject();
+        for (int i = 0; i < schema.fields().size(); i++) {
+            NestedField field = schema.fields().get(i);
+            String fieldName = field.name();
+            Type fieldType = field.type();
+            ATypeTag typeTag =
+                    getTypeTag(fieldType, structLike.get(i, 
fieldType.typeId().javaClass()) == null, parserContext);
+            IValueReference value;
+            if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+                value = valueEmbedder.getEmbeddedValue();
+            } else {
+                valueBuffer.reset();
+                parseValue(fieldType, structLike.get(i, 
fieldType.typeId().javaClass()), valueBuffer.getDataOutput());
+                value = valueBuffer;
+            }
+
+            if (value != null) {
+                // Ignore missing values
+                
objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+            }
+        }
+
+        embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+        objectBuilder.write(out, true);
+        valueEmbedder.exitObject();
+        parserContext.exitObject(valueBuffer, null, objectBuilder);
+    }
+
+    private void parseMap(Types.MapType mapSchema, Map<?, ?> map, DataOutput 
out) throws IOException {
+        final IMutableValueStorage item = parserContext.enterCollection();
+        final IMutableValueStorage valueBuffer = parserContext.enterObject();
+        IARecordBuilder objectBuilder = 
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+        IAsterixListBuilder listBuilder =
+                
parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+
+        Type keyType = mapSchema.keyType();
+        Type valueType = mapSchema.valueType();
+
+        for (Map.Entry<?, ?> entry : map.entrySet()) {
+            objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+            valueBuffer.reset();
+            parseValue(keyType, entry.getKey(), valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("key"), 
valueBuffer);
+            valueBuffer.reset();
+            parseValue(valueType, entry.getValue(), 
valueBuffer.getDataOutput());
+            
objectBuilder.addField(parserContext.getSerializedFieldName("value"), 
valueBuffer);
+            item.reset();
+            objectBuilder.write(item.getDataOutput(), true);
+            listBuilder.addItem(item);
+        }
+
+        listBuilder.write(out, true);
+        parserContext.exitObject(valueBuffer, null, objectBuilder);
+        parserContext.exitCollection(item, listBuilder);
     }
 
     private void serializeInteger(Object value, DataOutput out) throws 
HyracksDataException {
@@ -245,8 +271,8 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
 
     private void serializeFloat(Object value, DataOutput out) throws 
HyracksDataException {
         float floatValue = (Float) value;
-        aFloat.setValue(floatValue);
-        floatSerde.serialize(aFloat, out);
+        aDouble.setValue(floatValue);
+        doubleSerde.serialize(aDouble, out);
     }
 
     private void serializeDouble(Object value, DataOutput out) throws 
HyracksDataException {
@@ -278,6 +304,24 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
         binarySerde.serialize(aBinary, out);
     }
 
+    private void serializeFixedBinary(Object value, DataOutput out) throws 
HyracksDataException {
+        byte[] bytes = (byte[]) value;
+        aBinary.setValue(bytes, 0, bytes.length);
+        binarySerde.serialize(aBinary, out);
+    }
+
+    public void serializeDate(Object value, DataOutput output) throws 
HyracksDataException {
+        LocalDate localDate = (LocalDate) value;
+        aDate.setValue((int) localDate.toEpochDay());
+        dateSerde.serialize(aDate, output);
+    }
+
+    public void serializeTime(Object value, DataOutput output) throws 
HyracksDataException {
+        LocalTime localTime = (LocalTime) value;
+        aTime.setValue((int) (localTime.toNanoOfDay() / 1_000_000));
+        timeSerde.serialize(aTime, output);
+    }
+
     private static HyracksDataException createUnsupportedException(Type type) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg 
Parser", type.toString());
     }
@@ -289,4 +333,29 @@ public class IcebergParquetDataParser extends 
AbstractDataParser implements IRec
                     ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
         }
     }
+
+    public static ATypeTag getTypeTag(Type type, boolean isNull, 
IcebergConverterContext parserContext)
+            throws HyracksDataException {
+        if (isNull) {
+            return ATypeTag.NULL;
+        }
+
+        return switch (type.typeId()) {
+            case BOOLEAN -> ATypeTag.BOOLEAN;
+            case INTEGER, LONG -> ATypeTag.BIGINT;
+            case FLOAT, DOUBLE -> ATypeTag.DOUBLE;
+            case STRING -> ATypeTag.STRING;
+            case UUID -> ATypeTag.UUID;
+            case FIXED, BINARY -> ATypeTag.BINARY;
+            case DECIMAL -> {
+                ensureDecimalToDoubleEnabled(type, parserContext);
+                yield ATypeTag.DOUBLE;
+            }
+            case STRUCT -> ATypeTag.OBJECT;
+            case LIST, MAP -> ATypeTag.ARRAY;
+            case DATE -> ATypeTag.DATE;
+            case TIME -> ATypeTag.TIME;
+            default -> throw createUnsupportedException(type);
+        };
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 9608108551..c026f29cff 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -318,10 +318,9 @@ public class S3Utils {
             throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED, 
srcLoc, ExternalDataConstants.KEY_FORMAT);
         }
 
-        // iceberg tables can be created without passing the bucket,
-        // only validate bucket presence if container is passed
+        // container is not needed for iceberg tables, skip validation
         String container = 
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
-        if (IcebergUtils.isIcebergTable(configuration) && container == null) {
+        if (IcebergUtils.isIcebergTable(configuration)) {
             return;
         }
 
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 369b5fe2a6..3569aca047 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -27,6 +27,7 @@ import java.io.IOException;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 
 import org.apache.asterix.common.config.CatalogConfig;
@@ -168,6 +169,8 @@ public class IcebergUtils {
             throw 
CompilationException.create(ErrorCode.UNSUPPORTED_ICEBERG_CATALOG_SOURCE, 
source);
         }
 
+        // remove null values to avoid failures in internal checks
+        catalogProperties.values().removeIf(Objects::isNull);
         return switch (catalogSource.get()) {
             case CatalogConfig.IcebergCatalogSource.AWS_GLUE -> 
GlueUtils.initializeCatalog(catalogProperties, namespace);
             case CatalogConfig.IcebergCatalogSource.BIGLAKE_METASTORE -> 
BiglakeMetastore.initializeCatalog(catalogProperties, namespace);
@@ -201,4 +204,14 @@ public class IcebergUtils {
         ARecordType projectedRecordType = 
ExternalDataUtils.getExpectedType(encoded);
         return projectedRecordType.getFieldNames();
     }
+
+    /**
+     * Sets the default format to Parquet if the format is not provided for 
Iceberg tables
+     * @param configuration configuration
+     */
+    public static void setDefaultFormat(Map<String, String> configuration) {
+        if (IcebergUtils.isIcebergTable(configuration) && 
configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+            configuration.put(ExternalDataConstants.KEY_FORMAT, 
ExternalDataConstants.FORMAT_PARQUET);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 669fd20db3..943cbff1a9 100644
--- 
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++ 
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1013,7 +1013,7 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
             setSourceType(configuration, adapterName);
 
             // for iceberg table, add catalog properties to the configuration
-            addIcebergCatalogPropertiesIfNeeded(configuration);
+            addIcebergCatalogPropertiesIfNeeded(appCtx, configuration);
             return 
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
 adapterName,
                     configuration, itemType, null, warningCollector, 
filterEvaluatorFactory);
         } catch (AlgebricksException e) {
@@ -1023,7 +1023,8 @@ public class MetadataProvider implements 
IMetadataProvider<DataSourceId, String>
         }
     }
 
-    private void addIcebergCatalogPropertiesIfNeeded(Map<String, String> 
configuration) throws AlgebricksException {
+    protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext 
appCtx, Map<String, String> configuration)
+            throws AlgebricksException {
         if (IcebergUtils.isIcebergTable(configuration)) {
             String catalogName = 
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
             IcebergCatalog catalog =

Reply via email to