This is an automated email from the ASF dual-hosted git repository. mblow pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/asterixdb.git
commit 6249ff58f37131cb5ab475f7fe2a98a930bac186 Author: Peeyush Gupta <[email protected]> AuthorDate: Mon Nov 18 17:36:24 2024 -0800 [ASTERIXDB-3503][EXT] Add column filter for Delta Reader. - user model changes: no - storage format changes: no - interface changes: no Details: Reading only the required columns from the delta table. Ext-ref: MB-63840 Change-Id: I809c692777349025d5ce0435c3a6068d432cd282 Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19085 Integration-Tests: Jenkins <[email protected]> Reviewed-by: Murtadha Hubail <[email protected]> Tested-by: Murtadha Hubail <[email protected]> --- .../aws/delta/AsterixTypeToDeltaTypeVisitor.java | 145 +++++++++++++++++++++ .../reader/aws/delta/AwsS3DeltaReaderFactory.java | 39 +++++- .../aws/delta/converter/DeltaConverterContext.java | 10 +- .../asterix/external/parser/DeltaDataParser.java | 13 +- .../asterix/external/util/ExternalDataUtils.java | 2 +- 5 files changed, 202 insertions(+), 7 deletions(-) diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java new file mode 100644 index 0000000000..2c86132490 --- /dev/null +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AsterixTypeToDeltaTypeVisitor.java @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.asterix.external.input.record.reader.aws.delta; + +import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.ALL_FIELDS_TYPE; +import static org.apache.asterix.om.utils.ProjectionFiltrationTypeUtil.EMPTY_TYPE; + +import java.util.Map; + +import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; +import org.apache.asterix.external.parser.DeltaDataParser; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.om.types.ATypeTag; +import org.apache.asterix.om.types.AUnionType; +import org.apache.asterix.om.types.AbstractCollectionType; +import org.apache.asterix.om.types.IAType; +import org.apache.asterix.om.types.IATypeVisitor; +import org.apache.asterix.runtime.projection.FunctionCallInformation; +import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.Warning; + +import io.delta.kernel.types.ArrayType; +import io.delta.kernel.types.DataType; +import io.delta.kernel.types.StructType; + +/** + * This visitor clips the filesSchema stored in Delta table metadata using the expected type + */ +public class AsterixTypeToDeltaTypeVisitor implements IATypeVisitor<DataType, DataType> { + + private final DeltaConverterContext context; + private Map<String, FunctionCallInformation> funcInfo; + + public AsterixTypeToDeltaTypeVisitor(DeltaConverterContext context) { + this.context = context; + } + + public StructType clipType(ARecordType rootType, StructType fileSchema, + Map<String, FunctionCallInformation> funcInfo) { + if (rootType == EMPTY_TYPE) { + return new StructType(); + } else if (rootType == ALL_FIELDS_TYPE) { + return fileSchema; + } + StructType builder = new StructType(); + this.funcInfo = funcInfo; + return clipObjectChildren(builder, rootType, fileSchema); + } + + @Override + public DataType visit(ARecordType recordType, DataType arg) { + if (isNotCompatibleType(arg, recordType)) { + return null; + } + StructType builder = new StructType(); + builder = clipObjectChildren(builder, recordType, (StructType) arg); + if (builder.fields().size() == 0) { + return null; + } + return builder; + } + + @Override + public DataType visit(AbstractCollectionType collectionType, DataType arg) { + if (isNotCompatibleType(arg, collectionType)) { + return null; + } + DataType elementSchema = ((ArrayType) arg).getElementType(); + DataType requestedChildType = collectionType.getItemType().accept(this, elementSchema); + return new ArrayType(requestedChildType, true); + } + + private StructType clipObjectChildren(StructType builder, ARecordType recordType, StructType arg) { + String[] fieldNames = recordType.getFieldNames(); + IAType[] fieldTypes = recordType.getFieldTypes(); + for (int i = 0; i < fieldNames.length; i++) { + // If the field is not present in the file schema, we skip it + if (arg.fieldNames().contains(fieldNames[i])) { + DataType type = arg.get(fieldNames[i]).getDataType(); + DataType childType = fieldTypes[i].accept(this, type); + if (childType != null) { + // We only add non-MISSING children + builder = builder.add(fieldNames[i], childType); + } + } + } + return builder; + } + + @Override + public DataType visit(AUnionType unionType, DataType arg) { + if (arg instanceof ArrayType) { + return unionType.getType(ATypeTag.ARRAY).accept(this, arg); + } else { + return unionType.getType(ATypeTag.OBJECT).accept(this, arg); + } + } + + @Override + public DataType visitFlat(IAType node, DataType arg) { + return arg; + } + + private boolean isNotCompatibleType(DataType type, IAType node) { + // typeName is unique + FunctionCallInformation info = funcInfo.get(node.getTypeName()); + ATypeTag actualType = null; + try { + actualType = DeltaDataParser.getTypeTag(type, false, context); + } catch (HyracksDataException e) { + throw new RuntimeException(e); + } + ATypeTag expectedType = node.getTypeTag(); + + boolean isNotExpected = actualType != expectedType; + if (isNotExpected) { + //If no warning is created, then it means it has been reported + Warning warning = null; + if (actualType != ATypeTag.SYSTEM_NULL) { + warning = info.createWarning(expectedType, actualType); + } + if (warning != null) { + //New warning that we saw for the first time. We should report it. + context.getWarnings().add(warning); + } + } + return isNotExpected; + } +} diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java index 9909cc32bd..5ce2c78152 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/AwsS3DeltaReaderFactory.java @@ -20,6 +20,7 @@ package org.apache.asterix.external.input.record.reader.aws.delta; import static org.apache.asterix.external.util.aws.s3.S3Constants.SERVICE_END_POINT_FIELD_NAME; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; @@ -34,8 +35,12 @@ import org.apache.asterix.common.external.IExternalFilterEvaluatorFactory; import org.apache.asterix.external.api.IExternalDataRuntimeContext; import org.apache.asterix.external.api.IRecordReader; import org.apache.asterix.external.api.IRecordReaderFactory; +import org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext; import org.apache.asterix.external.util.ExternalDataConstants; +import org.apache.asterix.external.util.HDFSUtils; import org.apache.asterix.external.util.aws.s3.S3Constants; +import org.apache.asterix.om.types.ARecordType; +import org.apache.asterix.runtime.projection.FunctionCallInformation; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hyracks.algebricks.common.constraints.AlgebricksAbsolutePartitionConstraint; @@ -43,6 +48,7 @@ import org.apache.hyracks.algebricks.common.exceptions.AlgebricksException; import org.apache.hyracks.api.application.IServiceContext; import org.apache.hyracks.api.exceptions.HyracksDataException; import org.apache.hyracks.api.exceptions.IWarningCollector; +import org.apache.hyracks.api.exceptions.Warning; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -52,6 +58,7 @@ import io.delta.kernel.data.FilteredColumnarBatch; import io.delta.kernel.data.Row; import io.delta.kernel.defaults.engine.DefaultEngine; import io.delta.kernel.engine.Engine; +import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { @@ -87,6 +94,10 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { if (serviceEndpoint != null) { conf.set(S3Constants.HADOOP_SERVICE_END_POINT, serviceEndpoint); } + conf.set(ExternalDataConstants.KEY_REQUESTED_FIELDS, + configuration.getOrDefault(ExternalDataConstants.KEY_REQUESTED_FIELDS, "")); + conf.set(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, + configuration.getOrDefault(ExternalDataConstants.KEY_HADOOP_ASTERIX_FUNCTION_CALL_INFORMATION, "")); String tableMetadataPath = S3Constants.HADOOP_S3_PROTOCOL + "://" + configuration.get(ExternalDataConstants.CONTAINER_NAME_FIELD_NAME) + '/' + configuration.get(ExternalDataConstants.DEFINITION_FIELD_NAME); @@ -96,7 +107,21 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { Engine engine = DefaultEngine.create(conf); io.delta.kernel.Table table = io.delta.kernel.Table.forPath(engine, tableMetadataPath); Snapshot snapshot = table.getLatestSnapshot(engine); - Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, snapshot.getSchema(engine)).build(); + + List<Warning> warnings = new ArrayList<>(); + DeltaConverterContext converterContext = new DeltaConverterContext(configuration, warnings); + AsterixTypeToDeltaTypeVisitor visitor = new AsterixTypeToDeltaTypeVisitor(converterContext); + StructType requiredSchema; + try { + ARecordType expectedType = HDFSUtils.getExpectedType(conf); + Map<String, FunctionCallInformation> functionCallInformationMap = + HDFSUtils.getFunctionCallInformationMap(conf); + StructType fileSchema = snapshot.getSchema(engine); + requiredSchema = visitor.clipType(expectedType, fileSchema, functionCallInformationMap); + } catch (IOException e) { + throw new RuntimeException(e); + } + Scan scan = snapshot.getScanBuilder(engine).withReadSchema(engine, requiredSchema).build(); scanState = RowSerDe.serializeRowToJson(scan.getScanState(engine)); CloseableIterator<FilteredColumnarBatch> iter = scan.getScanFiles(engine); @@ -112,6 +137,18 @@ public class AwsS3DeltaReaderFactory implements IRecordReaderFactory<Object> { locationConstraints = configureLocationConstraints(appCtx); configuration.put(ExternalDataConstants.KEY_PARSER, ExternalDataConstants.FORMAT_DELTA); distributeFiles(); + issueWarnings(warnings, warningCollector); + } + + private void issueWarnings(List<Warning> warnings, IWarningCollector warningCollector) { + if (!warnings.isEmpty() && warningCollector.shouldWarn()) { + for (Warning warning : warnings) { + if (warningCollector.shouldWarn()) { + warningCollector.warn(warning); + } + } + } + warnings.clear(); } private AlgebricksAbsolutePartitionConstraint configureLocationConstraints(ICcApplicationContext appCtx) { diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java index 81e465c100..a404e2fbc6 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java @@ -19,6 +19,7 @@ package org.apache.asterix.external.input.record.reader.aws.delta.converter; import java.io.DataOutput; +import java.util.List; import java.util.Map; import java.util.TimeZone; @@ -32,6 +33,7 @@ import org.apache.asterix.om.base.AMutableDateTime; import org.apache.asterix.om.types.BuiltinType; import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.Warning; public class DeltaConverterContext extends ParserContext { @SuppressWarnings("unchecked") @@ -47,8 +49,10 @@ public class DeltaConverterContext extends ParserContext { private final int timeZoneOffset; private final AMutableDate mutableDate = new AMutableDate(0); private final AMutableDateTime mutableDateTime = new AMutableDateTime(0); + private final List<Warning> warnings; - public DeltaConverterContext(Map<String, String> configuration) { + public DeltaConverterContext(Map<String, String> configuration, List<Warning> warnings) { + this.warnings = warnings; decimalToDouble = Boolean.parseBoolean(configuration .getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, ExternalDataConstants.FALSE)); timestampAsLong = Boolean.parseBoolean(configuration @@ -96,4 +100,8 @@ public class DeltaConverterContext extends ParserContext { public boolean isDateAsInt() { return dateAsInt; } + + public List<Warning> getWarnings() { + return warnings; + } } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java index ea02d77acb..adb846d270 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java @@ -23,6 +23,8 @@ import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString; import java.io.DataOutput; import java.io.IOException; import java.math.BigDecimal; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -42,6 +44,7 @@ import org.apache.asterix.om.pointables.base.DefaultOpenFieldType; import org.apache.asterix.om.types.ATypeTag; import org.apache.avro.AvroRuntimeException; import org.apache.hyracks.api.exceptions.HyracksDataException; +import org.apache.hyracks.api.exceptions.Warning; import org.apache.hyracks.data.std.api.IMutableValueStorage; import org.apache.hyracks.data.std.api.IValueReference; @@ -68,7 +71,8 @@ public class DeltaDataParser extends AbstractDataParser implements IRecordDataPa private final IExternalFilterValueEmbedder valueEmbedder; public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, String> conf) { - parserContext = new DeltaConverterContext(conf); + List<Warning> warnings = new ArrayList<>(); + parserContext = new DeltaConverterContext(conf, warnings); valueEmbedder = context.getValueEmbedder(); } @@ -91,7 +95,7 @@ public class DeltaDataParser extends AbstractDataParser implements IRecordDataPa for (int i = 0; i < schema.fields().size(); i++) { DataType fieldSchema = schema.fields().get(i).getDataType(); String fieldName = schema.fieldNames().get(i); - ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i)); + ATypeTag typeTag = getTypeTag(fieldSchema, record.isNullAt(i), parserContext); IValueReference value = null; if (valueEmbedder.shouldEmbed(fieldName, typeTag)) { value = valueEmbedder.getEmbeddedValue(); @@ -120,7 +124,7 @@ public class DeltaDataParser extends AbstractDataParser implements IRecordDataPa for (int i = 0; i < schema.fields().size(); i++) { DataType fieldSchema = schema.fields().get(i).getDataType(); String fieldName = schema.fieldNames().get(i); - ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index)); + ATypeTag typeTag = getTypeTag(fieldSchema, column.getChild(i).isNullAt(index), parserContext); IValueReference value = null; if (valueEmbedder.shouldEmbed(fieldName, typeTag)) { value = valueEmbedder.getEmbeddedValue(); @@ -157,7 +161,8 @@ public class DeltaDataParser extends AbstractDataParser implements IRecordDataPa parserContext.exitCollection(valueBuffer, arrayBuilder); } - private ATypeTag getTypeTag(DataType schema, boolean isNull) throws HyracksDataException { + public static ATypeTag getTypeTag(DataType schema, boolean isNull, DeltaConverterContext parserContext) + throws HyracksDataException { if (isNull) { return ATypeTag.NULL; } diff --git a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java index bc5b8c3326..50e72ed047 100644 --- a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java +++ b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java @@ -922,7 +922,7 @@ public class ExternalDataUtils { public static boolean supportsPushdown(Map<String, String> properties) { //Currently, only Apache Parquet format is supported - return isParquetFormat(properties); + return isParquetFormat(properties) || isDeltaTable(properties); } /**
