This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git

commit 67d837a220d9dcd168a539cfe40d1c4ca230c1ce
Author: Murtadha Hubail <[email protected]>
AuthorDate: Fri Nov 15 02:04:54 2024 +0300

    [ASTERIXDB-3503][EXT] Add optional parsing parameters for delta
    
    - user model changes: yes
    - storage format changes: no
    - interface changes: no
    
    Details:
    - Add the following optional parsing parameters for delta:
       -- timestamp-to-long: parse date as int; otherwise as ADateTime
       -- date-to-int: parse date as int; otherwise as ADate
       -- decimal-to-double
       -- timezone
    
    Ext-ref: MB-63840
    
    Change-Id: I6fa3d46588116508716fc1abd693f75ee5538d7f
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/19068
    Reviewed-by: Ali Alsuliman <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
---
 .../aws/delta/converter/DeltaConverterContext.java | 99 ++++++++++++++++++++++
 .../asterix/external/parser/DeltaDataParser.java   | 85 +++++++++++++------
 .../factory/DeltaTableDataParserFactory.java       |  3 +-
 .../external/util/ExternalDataConstants.java       | 16 ++--
 .../asterix/external/util/ExternalDataUtils.java   | 11 ++-
 5 files changed, 181 insertions(+), 33 deletions(-)

diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
new file mode 100644
index 0000000000..81e465c100
--- /dev/null
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/input/record/reader/aws/delta/converter/DeltaConverterContext.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.asterix.external.input.record.reader.aws.delta.converter;
+
+import java.io.DataOutput;
+import java.util.Map;
+import java.util.TimeZone;
+
+import org.apache.asterix.external.parser.jackson.ParserContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
+import org.apache.asterix.formats.nontagged.SerializerDeserializerProvider;
+import org.apache.asterix.om.base.ADate;
+import org.apache.asterix.om.base.ADateTime;
+import org.apache.asterix.om.base.AMutableDate;
+import org.apache.asterix.om.base.AMutableDateTime;
+import org.apache.asterix.om.types.BuiltinType;
+import org.apache.hyracks.api.dataflow.value.ISerializerDeserializer;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
+
+public class DeltaConverterContext extends ParserContext {
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADate> dateSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATE);
+    @SuppressWarnings("unchecked")
+    private final ISerializerDeserializer<ADateTime> datetimeSerDer =
+            
SerializerDeserializerProvider.INSTANCE.getSerializerDeserializer(BuiltinType.ADATETIME);
+    private final boolean decimalToDouble;
+    private final boolean timestampAsLong;
+    private final boolean dateAsInt;
+
+    private final int timeZoneOffset;
+    private final AMutableDate mutableDate = new AMutableDate(0);
+    private final AMutableDateTime mutableDateTime = new AMutableDateTime(0);
+
+    public DeltaConverterContext(Map<String, String> configuration) {
+        decimalToDouble = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.DeltaOptions.DECIMAL_TO_DOUBLE, 
ExternalDataConstants.FALSE));
+        timestampAsLong = Boolean.parseBoolean(configuration
+                
.getOrDefault(ExternalDataConstants.DeltaOptions.TIMESTAMP_AS_LONG, 
ExternalDataConstants.TRUE));
+        dateAsInt = Boolean.parseBoolean(
+                
configuration.getOrDefault(ExternalDataConstants.DeltaOptions.DATE_AS_INT, 
ExternalDataConstants.TRUE));
+        String configuredTimeZoneId = 
configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE);
+        if (configuredTimeZoneId != null && !configuredTimeZoneId.isEmpty()) {
+            timeZoneOffset = 
TimeZone.getTimeZone(configuredTimeZoneId).getRawOffset();
+        } else {
+            timeZoneOffset = 0;
+        }
+    }
+
+    public void serializeDate(int value, DataOutput output) {
+        try {
+            mutableDate.setValue(value);
+            dateSerDer.serialize(mutableDate, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public void serializeDateTime(long timestamp, DataOutput output) {
+        try {
+            mutableDateTime.setValue(timestamp);
+            datetimeSerDer.serialize(mutableDateTime, output);
+        } catch (HyracksDataException e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public boolean isDecimalToDoubleEnabled() {
+        return decimalToDouble;
+    }
+
+    public int getTimeZoneOffset() {
+        return timeZoneOffset;
+    }
+
+    public boolean isTimestampAsLong() {
+        return timestampAsLong;
+    }
+
+    public boolean isDateAsInt() {
+        return dateAsInt;
+    }
+}
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
index e56be867bd..ea02d77acb 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/DeltaDataParser.java
@@ -18,12 +18,12 @@
  */
 package org.apache.asterix.external.parser;
 
-import static org.apache.avro.Schema.Type.NULL;
 import static org.apache.hyracks.api.util.ExceptionUtils.getMessageOrToString;
 
 import java.io.DataOutput;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.asterix.builders.IARecordBuilder;
@@ -34,7 +34,8 @@ import 
org.apache.asterix.external.api.IExternalDataRuntimeContext;
 import org.apache.asterix.external.api.IRawRecord;
 import org.apache.asterix.external.api.IRecordDataParser;
 import 
org.apache.asterix.external.input.filter.embedder.IExternalFilterValueEmbedder;
-import org.apache.asterix.external.parser.jackson.ParserContext;
+import 
org.apache.asterix.external.input.record.reader.aws.delta.converter.DeltaConverterContext;
+import org.apache.asterix.external.util.ExternalDataConstants;
 import org.apache.asterix.om.base.ABoolean;
 import org.apache.asterix.om.base.ANull;
 import org.apache.asterix.om.pointables.base.DefaultOpenFieldType;
@@ -63,11 +64,11 @@ import io.delta.kernel.types.TimestampNTZType;
 import io.delta.kernel.types.TimestampType;
 
 public class DeltaDataParser extends AbstractDataParser implements 
IRecordDataParser<Row> {
-    private final ParserContext parserContext;
+    private final DeltaConverterContext parserContext;
     private final IExternalFilterValueEmbedder valueEmbedder;
 
-    public DeltaDataParser(IExternalDataRuntimeContext context) {
-        parserContext = new ParserContext();
+    public DeltaDataParser(IExternalDataRuntimeContext context, Map<String, 
String> conf) {
+        parserContext = new DeltaConverterContext(conf);
         valueEmbedder = context.getValueEmbedder();
     }
 
@@ -160,7 +161,6 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
         if (isNull) {
             return ATypeTag.NULL;
         }
-
         if (schema instanceof BooleanType) {
             return ATypeTag.BOOLEAN;
         } else if (schema instanceof ShortType || schema instanceof 
IntegerType || schema instanceof LongType) {
@@ -170,15 +170,24 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
         } else if (schema instanceof StringType) {
             return ATypeTag.STRING;
         } else if (schema instanceof DateType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isDateAsInt()) {
+                return ATypeTag.INTEGER;
+            }
+            return ATypeTag.DATE;
         } else if (schema instanceof TimestampType || schema instanceof 
TimestampNTZType) {
-            return ATypeTag.BIGINT;
+            if (parserContext.isTimestampAsLong()) {
+                return ATypeTag.BIGINT;
+            }
+            return ATypeTag.DATETIME;
         } else if (schema instanceof BinaryType) {
             return ATypeTag.BINARY;
         } else if (schema instanceof ArrayType) {
             return ATypeTag.ARRAY;
         } else if (schema instanceof StructType) {
             return ATypeTag.OBJECT;
+        } else if (schema instanceof DecimalType) {
+            ensureDecimalToDoubleEnabled(schema, parserContext);
+            return ATypeTag.DOUBLE;
         } else {
             throw createUnsupportedException(schema);
         }
@@ -204,11 +213,26 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
         } else if (schema instanceof StringType) {
             serializeString(row.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(row.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(row.getInt(index), out);
+            } else {
+                parserContext.serializeDate(row.getInt(index), out);
+            }
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(row.getLong(index), out);
+            long timeStampInMillis = 
TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, 
out);
+            }
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(row.getLong(index), out);
+            long timeStampInMillis = 
TimeUnit.MICROSECONDS.toMillis(row.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
         } else if (schema instanceof StructType) {
             parseObject(row.getStruct(index), out);
         } else if (schema instanceof ArrayType) {
@@ -240,11 +264,26 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
         } else if (schema instanceof StringType) {
             serializeString(column.getString(index), out);
         } else if (schema instanceof DateType) {
-            serializeDate(column.getInt(index), out);
+            if (parserContext.isDateAsInt()) {
+                serializeLong(column.getInt(index), out);
+            } else {
+                parserContext.serializeDate(column.getInt(index), out);
+            }
         } else if (schema instanceof TimestampType) {
-            serializeTimestamp(column.getLong(index), out);
+            long timeStampInMillis = 
TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            int offset = parserContext.getTimeZoneOffset();
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis + offset, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis + offset, 
out);
+            }
         } else if (schema instanceof TimestampNTZType) {
-            serializeTimestamp(column.getLong(index), out);
+            long timeStampInMillis = 
TimeUnit.MICROSECONDS.toMillis(column.getLong(index));
+            if (parserContext.isTimestampAsLong()) {
+                serializeLong(timeStampInMillis, out);
+            } else {
+                parserContext.serializeDateTime(timeStampInMillis, out);
+            }
         } else if (schema instanceof ArrayType) {
             parseArray((ArrayType) schema, column.getArray(index), out);
         } else if (schema instanceof StructType) {
@@ -273,16 +312,6 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
         stringSerde.serialize(aString, out);
     }
 
-    private void serializeDate(Object value, DataOutput out) throws 
HyracksDataException {
-        aInt32.setValue((Integer) value);
-        int32Serde.serialize(aInt32, out);
-    }
-
-    private void serializeTimestamp(Object value, DataOutput out) throws 
HyracksDataException {
-        aInt64.setValue(TimeUnit.MICROSECONDS.toMillis((Long) value));
-        int64Serde.serialize(aInt64, out);
-    }
-
     private void serializeDecimal(BigDecimal value, DataOutput out) throws 
HyracksDataException {
         serializeDouble(value.doubleValue(), out);
     }
@@ -290,4 +319,12 @@ public class DeltaDataParser extends AbstractDataParser 
implements IRecordDataPa
     private static HyracksDataException createUnsupportedException(DataType 
schema) {
         return new RuntimeDataException(ErrorCode.TYPE_UNSUPPORTED, "Delta 
Parser", schema.toString());
     }
+
+    private static void ensureDecimalToDoubleEnabled(DataType type, 
DeltaConverterContext context)
+            throws RuntimeDataException {
+        if (!context.isDecimalToDoubleEnabled()) {
+            throw new 
RuntimeDataException(ErrorCode.PARQUET_SUPPORTED_TYPE_WITH_OPTION, 
type.toString(),
+                    ExternalDataConstants.ParquetOptions.DECIMAL_TO_DOUBLE);
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
index 5d4b2dd199..7f281053b8 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/parser/factory/DeltaTableDataParserFactory.java
@@ -60,7 +60,6 @@ public class DeltaTableDataParserFactory extends 
AbstractGenericDataParserFactor
     }
 
     private DeltaDataParser createParser(IExternalDataRuntimeContext context) {
-        return new DeltaDataParser(context);
+        return new DeltaDataParser(context, configuration);
     }
-
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
index d8f89c22d7..ffe75cb68b 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataConstants.java
@@ -19,7 +19,6 @@
 package org.apache.asterix.external.util;
 
 import java.util.Set;
-import java.util.TimeZone;
 import java.util.function.LongSupplier;
 import java.util.function.Supplier;
 import java.util.regex.Pattern;
@@ -328,6 +327,16 @@ public class ExternalDataConstants {
         WRITER_SUPPORTED_COMPRESSION = Set.of(KEY_COMPRESSION_GZIP);
     }
 
+    public static class DeltaOptions {
+        private DeltaOptions() {
+        }
+
+        public static final String DECIMAL_TO_DOUBLE = "decimal-to-double";
+        public static final String TIMESTAMP_AS_LONG = "timestamp-to-long";
+        public static final String DATE_AS_INT = "date-to-int";
+        public static final String TIMEZONE = "timezone";
+    }
+
     public static class ParquetOptions {
         private ParquetOptions() {
         }
@@ -357,10 +366,5 @@ public class ExternalDataConstants {
          */
         public static final String TIMEZONE = "timezone";
         public static final String HADOOP_TIMEZONE = ASTERIX_HADOOP_PREFIX + 
TIMEZONE;
-
-        /**
-         * Valid time zones that are supported by Java
-         */
-        public static final Set<String> VALID_TIME_ZONES = 
Set.of(TimeZone.getAvailableIDs());
     }
 }
diff --git 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
index bf74079dca..bc5b8c3326 100644
--- 
a/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
+++ 
b/asterixdb/asterix-external-data/src/main/java/org/apache/asterix/external/util/ExternalDataUtils.java
@@ -48,6 +48,8 @@ import java.util.EnumMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
+import java.util.TimeZone;
 import java.util.function.BiPredicate;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -109,6 +111,8 @@ import org.apache.iceberg.hadoop.HadoopTables;
 import org.apache.iceberg.io.CloseableIterable;
 
 public class ExternalDataUtils {
+
+    private static final Set<String> validTimeZones = 
Set.of(TimeZone.getAvailableIDs());
     private static final Map<ATypeTag, IValueParserFactory> 
valueParserFactoryMap = new EnumMap<>(ATypeTag.class);
     private static final int DEFAULT_MAX_ARGUMENT_SZ = 1024 * 1024;
     private static final int HEADER_FUDGE = 64;
@@ -493,6 +497,11 @@ public class ExternalDataUtils {
             throw new 
CompilationException(ErrorCode.INVALID_DELTA_TABLE_FORMAT,
                     configuration.get(ExternalDataConstants.KEY_FORMAT));
         }
+        if 
(configuration.containsKey(ExternalDataConstants.DeltaOptions.TIMEZONE)
+                && 
!validTimeZones.contains(configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE)))
 {
+            throw new CompilationException(ErrorCode.INVALID_TIMEZONE,
+                    
configuration.get(ExternalDataConstants.DeltaOptions.TIMEZONE));
+        }
     }
 
     public static void prepareIcebergTableFormat(Map<String, String> 
configuration, Configuration conf,
@@ -928,7 +937,7 @@ public class ExternalDataUtils {
             if (datasetRecordType.getFieldTypes().length != 0) {
                 throw new 
CompilationException(ErrorCode.UNSUPPORTED_TYPE_FOR_PARQUET, 
datasetRecordType.getTypeName());
             } else if (properties.containsKey(ParquetOptions.TIMEZONE)
-                    && 
!ParquetOptions.VALID_TIME_ZONES.contains(properties.get(ParquetOptions.TIMEZONE)))
 {
+                    && 
!validTimeZones.contains(properties.get(ParquetOptions.TIMEZONE))) {
                 //Ensure the configured time zone id is correct
                 throw new CompilationException(ErrorCode.INVALID_TIMEZONE, 
properties.get(ParquetOptions.TIMEZONE));
             }

Reply via email to