This is an automated email from the ASF dual-hosted git repository.
mblow pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new 200b2f8c71 [ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
200b2f8c71 is described below
commit 200b2f8c710d7b72439d41819681d19c33d3610f
Author: Hussain Towaileb <[email protected]>
AuthorDate: Sun Sep 21 16:46:35 2025 +0300
[ASTERIXDB-3634][EXT]: Add support to Iceberg pt.3
Details:
- Support parquet format by default unless format is provided.
- All some method to be overridden by extensions.
- Remove null properties values before init'ing catalog.
- Disable failing test.
- Support iceberg complex types + date + time
Ext-ref: MB-63115
Change-Id: I1726c2168bfec1f137390c7c2112c2df59151dc2
Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/20648
Reviewed-by: Hussain Towaileb <[email protected]>
Tested-by: Jenkins <[email protected]>
Reviewed-by: Peeyush Gupta <[email protected]>
Integration-Tests: Jenkins <[email protected]>
---
.../asterix/app/translator/QueryTranslator.java | 3 +-
.../runtimets/testsuite_external_dataset_s3.xml | 2 +
.../external/parser/IcebergParquetDataParser.java | 225 ++++++++++++++-------
.../asterix/external/util/aws/s3/S3Utils.java | 5 +-
.../external/util/iceberg/IcebergUtils.java | 13 ++
.../metadata/declared/MetadataProvider.java | 5 +-
6 files changed, 169 insertions(+), 84 deletions(-)
diff --git
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
index 0946e4a4a3..3a6274647d 100644
---
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
+++
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/translator/QueryTranslator.java
@@ -1187,11 +1187,12 @@ public class QueryTranslator extends
AbstractLangTranslator implements IStatemen
return Optional.of(dataset);
}
- private void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
+ protected void validateIfIcebergTable(Map<String, String> properties,
MetadataTransactionContext mdTxnCtx,
SourceLocation srcLoc) throws AlgebricksException {
if (!IcebergUtils.isIcebergTable(properties)) {
return;
}
+ IcebergUtils.setDefaultFormat(properties);
// ensure the specified catalog exists
String catalogName =
properties.get(IcebergConstants.ICEBERG_CATALOG_NAME);
diff --git
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
index c6d6d32035..ae4154ed54 100644
---
a/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
+++
b/asterixdb/asterix-app/src/test/resources/runtimets/testsuite_external_dataset_s3.xml
@@ -1479,12 +1479,14 @@
<expected-error>ASX1178: Unsupported iceberg
table</expected-error>
</compilation-unit>
</test-case>
+ <!-- old iceberg test, check why failing
<test-case FilePath="external-dataset/s3">
<compilation-unit name="iceberg-mixed-data-format">
<output-dir compare="Text">none</output-dir>
<expected-error>avro-file.avro. Reason: not a Parquet
file</expected-error>
</compilation-unit>
</test-case>
+ -->
<test-case FilePath="external-dataset/s3">
<compilation-unit name="iceberg-empty">
<output-dir compare="Text">iceberg-empty</output-dir>
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
index 634b44d606..5afddc37b7 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/IcebergParquetDataParser.java
@@ -25,6 +25,8 @@ import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -45,16 +47,17 @@ import org.apache.asterix.om.base.ANull;
import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
import org.apache.asterix.om.types.ATypeTag;
import org.apache.avro.AvroRuntimeException;
-import org.apache.hyracks.algebricks.common.exceptions.NotImplementedException;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.Warning;
import org.apache.hyracks.data.std.api.IMutableValueStorage;
import org.apache.hyracks.data.std.api.IValueReference;
import org.apache.iceberg.Schema;
+import org.apache.iceberg.StructLike;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.types.Type;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.types.Types.NestedField;
+import org.apache.iceberg.types.Types.StructType;
public class IcebergParquetDataParser extends AbstractDataParser implements
IRecordDataParser<Record> {
private final IcebergConverterContext parserContext;
@@ -72,7 +75,7 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
@Override
public boolean parse(IRawRecord<? extends Record> record, DataOutput out)
throws HyracksDataException {
try {
- parseObject(record.get(), out);
+ parseRootObject(record.get(), out);
valueEmbedder.reset();
return true;
} catch (AvroRuntimeException | IOException e) {
@@ -80,7 +83,7 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
}
}
- private void parseObject(Record record, DataOutput out) throws IOException
{
+ private void parseRootObject(Record record, DataOutput out) throws
IOException {
IMutableValueStorage valueBuffer = parserContext.enterObject();
IARecordBuilder objectBuilder =
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
valueEmbedder.enterObject();
@@ -94,7 +97,7 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
value = valueEmbedder.getEmbeddedValue();
} else {
valueBuffer.reset();
- parseValue(fieldType, record, i, valueBuffer.getDataOutput());
+ parseValue(fieldType, record.get(i),
valueBuffer.getDataOutput());
value = valueBuffer;
}
@@ -110,70 +113,7 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
parserContext.exitObject(valueBuffer, null, objectBuilder);
}
- private void parseArray(Type arrayType, boolean isOptional, List<?>
listValues, DataOutput out) throws IOException {
- if (listValues == null) {
- nullSerde.serialize(ANull.NULL, out);
- return;
- }
- final IMutableValueStorage valueBuffer =
parserContext.enterCollection();
- final IAsterixListBuilder arrayBuilder =
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
- for (int i = 0; i < listValues.size(); i++) {
- valueBuffer.reset();
- //parseValue(elementSchema, elements, i,
valueBuffer.getDataOutput());
- arrayBuilder.addItem(valueBuffer);
- }
- arrayBuilder.write(out, true);
- parserContext.exitCollection(valueBuffer, arrayBuilder);
- }
-
- public static ATypeTag getTypeTag(Type type, boolean isNull,
IcebergConverterContext parserContext)
- throws HyracksDataException {
- if (isNull) {
- return ATypeTag.NULL;
- }
-
- switch (type.typeId()) {
- case BOOLEAN:
- return ATypeTag.BOOLEAN;
- case INTEGER:
- case LONG:
- return ATypeTag.BIGINT;
- case FLOAT:
- return ATypeTag.FLOAT;
- case DOUBLE:
- return ATypeTag.DOUBLE;
- case STRING:
- return ATypeTag.STRING;
- case UUID:
- return ATypeTag.UUID;
- case BINARY:
- return ATypeTag.BINARY;
- case DECIMAL:
- ensureDecimalToDoubleEnabled(type, parserContext);
- return ATypeTag.DOUBLE;
- case STRUCT:
- return ATypeTag.OBJECT;
- case LIST:
- return ATypeTag.ARRAY;
- case DATE:
- case TIME:
- case TIMESTAMP:
- case TIMESTAMP_NANO:
- case FIXED:
- case GEOMETRY:
- case GEOGRAPHY:
- case MAP:
- case VARIANT:
- case UNKNOWN:
- throw new NotImplementedException();
- default:
- throw createUnsupportedException(type);
-
- }
- }
-
- private void parseValue(Type fieldType, Record record, int index,
DataOutput out) throws IOException {
- Object value = record.get(index);
+ private void parseValue(Type fieldType, Object value, DataOutput out)
throws IOException {
if (value == null) {
nullSerde.serialize(ANull.NULL, out);
return;
@@ -190,7 +130,6 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
serializeLong(value, out);
return;
case FLOAT:
- // TODO: should this be parsed as double?
serializeFloat(value, out);
return;
case DOUBLE:
@@ -202,6 +141,9 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
case UUID:
serializeUuid(value, out);
return;
+ case FIXED:
+ serializeFixedBinary(value, out);
+ return;
case BINARY:
serializeBinary(value, out);
return;
@@ -209,26 +151,110 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
ensureDecimalToDoubleEnabled(fieldType, parserContext);
serializeDecimal((BigDecimal) value, out);
return;
- case STRUCT:
- parseObject((Record) value, out);
- return;
case LIST:
Types.ListType listType = fieldType.asListType();
- parseArray(listType.elementType(),
listType.isElementOptional(), (List<?>) value, out);
+ parseArray(listType, (List<?>) value, out);
+ return;
+ case STRUCT:
+ parseObject((StructType) fieldType, (StructLike) value, out);
+ return;
+ case MAP:
+ Types.MapType mapType = fieldType.asMapType();
+ parseMap(mapType, (Map<?, ?>) value, out);
return;
case DATE:
+ serializeDate(value, out);
+ return;
case TIME:
+ serializeTime(value, out);
+ return;
case TIMESTAMP:
case TIMESTAMP_NANO:
- case FIXED:
case GEOMETRY:
case GEOGRAPHY:
- case MAP:
case VARIANT:
case UNKNOWN:
- throw new NotImplementedException();
+ default:
+ throw createUnsupportedException(fieldType);
+
+ }
+ }
+
+ private void parseArray(Types.ListType listType, List<?> listValues,
DataOutput out) throws IOException {
+ if (listValues == null) {
+ nullSerde.serialize(ANull.NULL, out);
+ return;
+ }
+ Type elementType = listType.elementType();
+ final IMutableValueStorage valueBuffer =
parserContext.enterCollection();
+ final IAsterixListBuilder arrayBuilder =
parserContext.getCollectionBuilder(NESTED_OPEN_AORDERED_LIST_TYPE);
+ for (Object listValue : listValues) {
+ valueBuffer.reset();
+ parseValue(elementType, listValue, valueBuffer.getDataOutput());
+ arrayBuilder.addItem(valueBuffer);
}
+ arrayBuilder.write(out, true);
+ parserContext.exitCollection(valueBuffer, arrayBuilder);
+ }
+
+ private void parseObject(StructType schema, StructLike structLike,
DataOutput out) throws IOException {
+ IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder =
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ valueEmbedder.enterObject();
+ for (int i = 0; i < schema.fields().size(); i++) {
+ NestedField field = schema.fields().get(i);
+ String fieldName = field.name();
+ Type fieldType = field.type();
+ ATypeTag typeTag =
+ getTypeTag(fieldType, structLike.get(i,
fieldType.typeId().javaClass()) == null, parserContext);
+ IValueReference value;
+ if (valueEmbedder.shouldEmbed(fieldName, typeTag)) {
+ value = valueEmbedder.getEmbeddedValue();
+ } else {
+ valueBuffer.reset();
+ parseValue(fieldType, structLike.get(i,
fieldType.typeId().javaClass()), valueBuffer.getDataOutput());
+ value = valueBuffer;
+ }
+
+ if (value != null) {
+ // Ignore missing values
+
objectBuilder.addField(parserContext.getSerializedFieldName(fieldName), value);
+ }
+ }
+
+ embedMissingValues(objectBuilder, parserContext, valueEmbedder);
+ objectBuilder.write(out, true);
+ valueEmbedder.exitObject();
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ }
+
+ private void parseMap(Types.MapType mapSchema, Map<?, ?> map, DataOutput
out) throws IOException {
+ final IMutableValueStorage item = parserContext.enterCollection();
+ final IMutableValueStorage valueBuffer = parserContext.enterObject();
+ IARecordBuilder objectBuilder =
parserContext.getObjectBuilder(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ IAsterixListBuilder listBuilder =
+
parserContext.getCollectionBuilder(DefaultOpenFieldType.NESTED_OPEN_AORDERED_LIST_TYPE);
+
+ Type keyType = mapSchema.keyType();
+ Type valueType = mapSchema.valueType();
+
+ for (Map.Entry<?, ?> entry : map.entrySet()) {
+ objectBuilder.reset(DefaultOpenFieldType.NESTED_OPEN_RECORD_TYPE);
+ valueBuffer.reset();
+ parseValue(keyType, entry.getKey(), valueBuffer.getDataOutput());
+
objectBuilder.addField(parserContext.getSerializedFieldName("key"),
valueBuffer);
+ valueBuffer.reset();
+ parseValue(valueType, entry.getValue(),
valueBuffer.getDataOutput());
+
objectBuilder.addField(parserContext.getSerializedFieldName("value"),
valueBuffer);
+ item.reset();
+ objectBuilder.write(item.getDataOutput(), true);
+ listBuilder.addItem(item);
+ }
+
+ listBuilder.write(out, true);
+ parserContext.exitObject(valueBuffer, null, objectBuilder);
+ parserContext.exitCollection(item, listBuilder);
}
private void serializeInteger(Object value, DataOutput out) throws
HyracksDataException {
@@ -245,8 +271,8 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
private void serializeFloat(Object value, DataOutput out) throws
HyracksDataException {
float floatValue = (Float) value;
- aFloat.setValue(floatValue);
- floatSerde.serialize(aFloat, out);
+ aDouble.setValue(floatValue);
+ doubleSerde.serialize(aDouble, out);
}
private void serializeDouble(Object value, DataOutput out) throws
HyracksDataException {
@@ -278,6 +304,24 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
binarySerde.serialize(aBinary, out);
}
+ private void serializeFixedBinary(Object value, DataOutput out) throws
HyracksDataException {
+ byte[] bytes = (byte[]) value;
+ aBinary.setValue(bytes, 0, bytes.length);
+ binarySerde.serialize(aBinary, out);
+ }
+
+ public void serializeDate(Object value, DataOutput output) throws
HyracksDataException {
+ LocalDate localDate = (LocalDate) value;
+ aDate.setValue((int) localDate.toEpochDay());
+ dateSerde.serialize(aDate, output);
+ }
+
+ public void serializeTime(Object value, DataOutput output) throws
HyracksDataException {
+ LocalTime localTime = (LocalTime) value;
+ aTime.setValue((int) (localTime.toNanoOfDay() / 1_000_000));
+ timeSerde.serialize(aTime, output);
+ }
+
private static HyracksDataException createUnsupportedException(Type type) {
return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Iceberg
Parser", type.toString());
}
@@ -289,4 +333,29 @@ public class IcebergParquetDataParser extends
AbstractDataParser implements IRec
ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
}
}
+
+ public static ATypeTag getTypeTag(Type type, boolean isNull,
IcebergConverterContext parserContext)
+ throws HyracksDataException {
+ if (isNull) {
+ return ATypeTag.NULL;
+ }
+
+ return switch (type.typeId()) {
+ case BOOLEAN -> ATypeTag.BOOLEAN;
+ case INTEGER, LONG -> ATypeTag.BIGINT;
+ case FLOAT, DOUBLE -> ATypeTag.DOUBLE;
+ case STRING -> ATypeTag.STRING;
+ case UUID -> ATypeTag.UUID;
+ case FIXED, BINARY -> ATypeTag.BINARY;
+ case DECIMAL -> {
+ ensureDecimalToDoubleEnabled(type, parserContext);
+ yield ATypeTag.DOUBLE;
+ }
+ case STRUCT -> ATypeTag.OBJECT;
+ case LIST, MAP -> ATypeTag.ARRAY;
+ case DATE -> ATypeTag.DATE;
+ case TIME -> ATypeTag.TIME;
+ default -> throw createUnsupportedException(type);
+ };
+ }
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
index 9608108551..c026f29cff 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/aws/s3/S3Utils.java
@@ -318,10 +318,9 @@ public class S3Utils {
throw new CompilationException(ErrorCode.PARAMETERS_REQUIRED,
srcLoc, ExternalDataConstants.KEY_FORMAT);
}
- // iceberg tables can be created without passing the bucket,
- // only validate bucket presence if container is passed
+ // container is not needed for iceberg tables, skip validation
String container =
configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME);
- if (IcebergUtils.isIcebergTable(configuration) && container == null) {
+ if (IcebergUtils.isIcebergTable(configuration)) {
return;
}
diff --git
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
index 369b5fe2a6..3569aca047 100644
---
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
+++
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/iceberg/IcebergUtils.java
@@ -27,6 +27,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import org.apache.asterix.common.config.CatalogConfig;
@@ -168,6 +169,8 @@ public class IcebergUtils {
throw
CompilationException.create(ErrorCode.UNSUPPORTED_ICEBERG_CATALOG_SOURCE,
source);
}
+ // remove null values to avoid failures in internal checks
+ catalogProperties.values().removeIf(Objects::isNull);
return switch (catalogSource.get()) {
case CatalogConfig.IcebergCatalogSource.AWS_GLUE ->
GlueUtils.initializeCatalog(catalogProperties, namespace);
case CatalogConfig.IcebergCatalogSource.BIGLAKE_METASTORE ->
BiglakeMetastore.initializeCatalog(catalogProperties, namespace);
@@ -201,4 +204,14 @@ public class IcebergUtils {
ARecordType projectedRecordType =
ExternalDataUtils.getExpectedType(encoded);
return projectedRecordType.getFieldNames();
}
+
+ /**
+ * Sets the default format to Parquet if the format is not provided for
Iceberg tables
+ * @param configuration configuration
+ */
+ public static void setDefaultFormat(Map<String, String> configuration) {
+ if (IcebergUtils.isIcebergTable(configuration) &&
configuration.get(ExternalDataConstants.KEY_FORMAT) == null) {
+ configuration.put(ExternalDataConstants.KEY_FORMAT,
ExternalDataConstants.FORMAT_PARQUET);
+ }
+ }
}
diff --git
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
index 669fd20db3..943cbff1a9 100644
---
a/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
+++
b/asterixdb/asterix-metadata/src/main/java/org/apache/asterix/metadata/declared/MetadataProvider.java
@@ -1013,7 +1013,7 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
setSourceType(configuration, adapterName);
// for iceberg table, add catalog properties to the configuration
- addIcebergCatalogPropertiesIfNeeded(configuration);
+ addIcebergCatalogPropertiesIfNeeded(appCtx, configuration);
return
AdapterFactoryProvider.getAdapterFactory(getApplicationContext().getServiceContext(),
adapterName,
configuration, itemType, null, warningCollector,
filterEvaluatorFactory);
} catch (AlgebricksException e) {
@@ -1023,7 +1023,8 @@ public class MetadataProvider implements
IMetadataProvider<DataSourceId, String>
}
}
- private void addIcebergCatalogPropertiesIfNeeded(Map<String, String>
configuration) throws AlgebricksException {
+ protected void addIcebergCatalogPropertiesIfNeeded(ICcApplicationContext
appCtx, Map<String, String> configuration)
+ throws AlgebricksException {
if (IcebergUtils.isIcebergTable(configuration)) {
String catalogName =
configuration.get(IcebergConstants.ICEBERG_CATALOG_NAME);
IcebergCatalog catalog =