twalthr commented on a change in pull request #13909:
URL: https://github.com/apache/flink/pull/13909#discussion_r518805037



##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatFactory.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Format factory for raw format which allows to read and write raw (byte 
based) values as a single column.
+ */
+public class RawFormatFactory implements DeserializationFormatFactory, 
SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "raw";
+       private static final String BIG_ENDIAN = "big-endian";
+       private static final String LITTLE_ENDIAN = "little-endian";
+
+       public static final ConfigOption<String> ENDIANNESS = ConfigOptions
+               .key("endianness")
+               .stringType()
+               .defaultValue(BIG_ENDIAN)
+               .withDescription("Defines the endianness of the bytes of the 
numeric value.");

Review comment:
       nit: `Defines the endianness for bytes of numeric values.`

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from raw (byte based) value to Flink Table/SQL 
internal data structure {@link RowData}.
+ */
+@Internal
+public class RawFormatDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType deserializedType;
+
+       private final DeserializationRuntimeConverter converter;
+
+       private final TypeInformation<RowData> producedTypeInfo;
+
+       private final String charsetName;
+
+       private final boolean isBigEndian;
+
+       private transient GenericRowData reuse;
+
+       public RawFormatDeserializationSchema(
+                       LogicalType deserializedType,
+                       TypeInformation<RowData> producedTypeInfo,
+                       String charsetName,
+                       boolean isBigEndian) {
+               this.deserializedType = checkNotNull(deserializedType);
+               this.producedTypeInfo = checkNotNull(producedTypeInfo);
+               this.converter = createConverter(deserializedType, charsetName, 
isBigEndian);
+               this.charsetName = charsetName;
+               this.isBigEndian = isBigEndian;
+       }
+
+       @Override
+       public void open(InitializationContext context) throws Exception {
+               reuse = new GenericRowData(1);
+               converter.open();
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               reuse.setField(0, converter.convert(message));
+               return reuse;
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return producedTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               RawFormatDeserializationSchema that = 
(RawFormatDeserializationSchema) o;
+               return producedTypeInfo.equals(that.producedTypeInfo) &&
+                       deserializedType.equals(that.deserializedType) &&
+                       charsetName.equals(that.charsetName) &&
+                       isBigEndian == that.isBigEndian;
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(producedTypeInfo, deserializedType, 
charsetName, isBigEndian);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that convert byte[] to internal data structure 
object.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+
+               default void open() {}
+
+               Object convert(byte[] data) throws IOException;
+       }
+
+       /**
+        * Creates a runtime converter.
+        */
+       private static DeserializationRuntimeConverter createConverter(
+                       LogicalType type, String charsetName, boolean 
isBigEndian) {
+               final DeserializationRuntimeConverter converter = 
createNotNullConverter(type, charsetName, isBigEndian);
+               final Consumer<byte[]> validator = 
createDataLengthValidator(type);
+
+               return new DeserializationRuntimeConverter() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void open() {
+                               converter.open();
+                       }
+
+                       @Override
+                       public Object convert(byte[] data) throws IOException {
+                               if (data == null) {
+                                       return null;
+                               }
+                               validator.accept(data);
+                               return converter.convert(data);
+                       }
+               };
+       }
+
+       private static DeserializationRuntimeConverter createNotNullConverter(
+                       LogicalType type, String charsetName, boolean 
isBigEndian) {
+               switch (type.getTypeRoot()) {
+                       case CHAR:
+                       case VARCHAR:
+                               return createStringConverter(charsetName);
+
+                       case VARBINARY:
+                       case BINARY:
+                               return data -> data;
+
+                       case RAW:
+                               return createRawValueConverter((RawType<?>) 
type);
+
+                       case BOOLEAN:
+                               return data -> data[0] != 0;
+
+                       case TINYINT:
+                               return data -> data[0];
+
+                       case SMALLINT:
+                               return createEndiannessAwareConverter(
+                                       isBigEndian,
+                                       segment -> segment.getShortBigEndian(0),
+                                       segment -> 
segment.getShortLittleEndian(0)
+                               );
+
+                       case INTEGER:
+                               return createEndiannessAwareConverter(
+                                       isBigEndian,
+                                       segment -> segment.getIntBigEndian(0),
+                                       segment -> segment.getIntLittleEndian(0)
+                               );
+
+                       case BIGINT:
+                               return createEndiannessAwareConverter(
+                                       isBigEndian,
+                                       segment -> segment.getLongBigEndian(0),
+                                       segment -> 
segment.getLongLittleEndian(0)
+                               );
+
+                       case FLOAT:
+                               return createEndiannessAwareConverter(
+                                       isBigEndian,
+                                       segment -> segment.getFloatBigEndian(0),
+                                       segment -> 
segment.getFloatLittleEndian(0)
+                               );
+
+                       case DOUBLE:
+                               return createEndiannessAwareConverter(
+                                       isBigEndian,
+                                       segment -> 
segment.getDoubleBigEndian(0),
+                                       segment -> 
segment.getDoubleLittleEndian(0)
+                               );
+
+                       default:
+                               throw new UnsupportedOperationException("'raw' 
format currently doesn't support type: " + type);
+               }
+       }
+
+       private static DeserializationRuntimeConverter 
createStringConverter(final String charsetName) {
+               // this also checks the charsetName is valid
+               Charset charset = Charset.forName(charsetName);
+               if (charset == StandardCharsets.UTF_8) {
+                       // avoid UTF-8 decoding if the given charset is UTF-8
+                       // because the underlying bytes of StringData is in 
UTF-8 encoding
+                       return StringData::fromBytes;
+               }
+
+               return new DeserializationRuntimeConverter() {
+                       private static final long serialVersionUID = 1L;
+                       private transient Charset charset;
+                       @Override
+                       public void open() {
+                               charset = Charset.forName(charsetName);
+                       }
+
+                       @Override
+                       public Object convert(byte[] data) {
+                               String str = new String(data, charset);
+                               return StringData.fromString(str);
+                       }
+               };
+       }
+
+       private static DeserializationRuntimeConverter 
createRawValueConverter(RawType<?> rawType) {
+               final TypeSerializer<?> serializer = 
rawType.getTypeSerializer();
+
+               return new DeserializationRuntimeConverter() {
+                       private static final long serialVersionUID = 1L;
+                       private final DataInputDeserializer source = new 
DataInputDeserializer();
+                       @Override
+                       public Object convert(byte[] data) throws IOException {
+                               source.setBuffer(data);
+                               return 
RawValueData.fromObject(serializer.deserialize(source));

Review comment:
       can't we use `RawValueData.fromBytes` here and do lazy deserialization?

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatFactory.java
##########
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.configuration.ConfigOption;
+import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.api.ValidationException;
+import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.format.DecodingFormat;
+import org.apache.flink.table.connector.format.EncodingFormat;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.factories.DeserializationFormatFactory;
+import org.apache.flink.table.factories.DynamicTableFactory.Context;
+import org.apache.flink.table.factories.FactoryUtil;
+import org.apache.flink.table.factories.SerializationFormatFactory;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
+
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Format factory for raw format which allows to read and write raw (byte 
based) values as a single column.
+ */
+public class RawFormatFactory implements DeserializationFormatFactory, 
SerializationFormatFactory {
+
+       public static final String IDENTIFIER = "raw";
+       private static final String BIG_ENDIAN = "big-endian";
+       private static final String LITTLE_ENDIAN = "little-endian";
+
+       public static final ConfigOption<String> ENDIANNESS = ConfigOptions
+               .key("endianness")
+               .stringType()
+               .defaultValue(BIG_ENDIAN)
+               .withDescription("Defines the endianness of the bytes of the 
numeric value.");
+
+       public static final ConfigOption<String> CHARSET = ConfigOptions
+               .key("charset")
+               .stringType()
+               .defaultValue(StandardCharsets.UTF_8.displayName())
+               .withDescription("Defines the string charset.");
+
+       @Override
+       public String factoryIdentifier() {
+               return IDENTIFIER;
+       }
+
+       @Override
+       public Set<ConfigOption<?>> requiredOptions() {
+               return Collections.emptySet();
+       }
+
+       @Override
+       public Set<ConfigOption<?>> optionalOptions() {
+               Set<ConfigOption<?>> options = new HashSet<>();
+               options.add(ENDIANNESS);
+               options.add(CHARSET);
+               return options;
+       }
+
+       @Override
+       public DecodingFormat<DeserializationSchema<RowData>> 
createDecodingFormat(
+                       Context context,
+                       ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+               final String charsetName = 
validateAndGetCharsetName(formatOptions);
+               final boolean isBigEndian = isBigEndian(formatOptions);
+
+               return new DecodingFormat<DeserializationSchema<RowData>>() {
+                       @Override
+                       public DeserializationSchema<RowData> 
createRuntimeDecoder(
+                                       DynamicTableSource.Context context,
+                                       DataType producedDataType) {
+                               final RowType physicalRowType = (RowType) 
producedDataType.getLogicalType();
+                               final LogicalType fieldType = 
validateAndExtractSingleField(physicalRowType);
+                               final TypeInformation<RowData> producedTypeInfo 
= context.createTypeInformation(producedDataType);
+                               return new 
RawFormatDeserializationSchema(fieldType, producedTypeInfo, charsetName, 
isBigEndian);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       @Override
+       public EncodingFormat<SerializationSchema<RowData>> 
createEncodingFormat(
+                       Context context, ReadableConfig formatOptions) {
+               FactoryUtil.validateFactoryOptions(this, formatOptions);
+               final String charsetName = 
validateAndGetCharsetName(formatOptions);
+               final boolean isBigEndian = isBigEndian(formatOptions);
+
+               return new EncodingFormat<SerializationSchema<RowData>>() {
+                       @Override
+                       public SerializationSchema<RowData> 
createRuntimeEncoder(
+                                       DynamicTableSink.Context context,
+                                       DataType consumedDataType) {
+                               final RowType physicalRowType = (RowType) 
consumedDataType.getLogicalType();
+                               final LogicalType fieldType = 
validateAndExtractSingleField(physicalRowType);
+                               return new 
RawFormatSerializationSchema(fieldType, charsetName, isBigEndian);
+                       }
+
+                       @Override
+                       public ChangelogMode getChangelogMode() {
+                               return ChangelogMode.insertOnly();
+                       }
+               };
+       }
+
+       // 
------------------------------------------------------------------------------------------
+
+       /**
+        * Validates and extract the single field type from the given physical 
row schema.
+        */
+       private static LogicalType validateAndExtractSingleField(RowType 
physicalRowType) {
+               if (physicalRowType.getFieldCount() != 1) {

Review comment:
       shall we perform real type validation here? e.g. TIMESTAMP or NULL would 
still pass

##########
File path: 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/raw/RawFormatDeserializationSchema.java
##########
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.raw;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.HeapMemorySegment;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RawValueData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RawType;
+import org.apache.flink.types.DeserializationException;
+import org.apache.flink.util.function.FunctionUtils;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.function.Consumer;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from raw (byte based) value to Flink Table/SQL 
internal data structure {@link RowData}.
+ */
+@Internal
+public class RawFormatDeserializationSchema implements 
DeserializationSchema<RowData> {
+
+       private static final long serialVersionUID = 1L;
+
+       private final LogicalType deserializedType;
+
+       private final DeserializationRuntimeConverter converter;
+
+       private final TypeInformation<RowData> producedTypeInfo;
+
+       private final String charsetName;
+
+       private final boolean isBigEndian;
+
+       private transient GenericRowData reuse;
+
+       public RawFormatDeserializationSchema(
+                       LogicalType deserializedType,
+                       TypeInformation<RowData> producedTypeInfo,
+                       String charsetName,
+                       boolean isBigEndian) {
+               this.deserializedType = checkNotNull(deserializedType);
+               this.producedTypeInfo = checkNotNull(producedTypeInfo);
+               this.converter = createConverter(deserializedType, charsetName, 
isBigEndian);
+               this.charsetName = charsetName;
+               this.isBigEndian = isBigEndian;
+       }
+
+       @Override
+       public void open(InitializationContext context) throws Exception {
+               reuse = new GenericRowData(1);
+               converter.open();
+       }
+
+       @Override
+       public RowData deserialize(byte[] message) throws IOException {
+               reuse.setField(0, converter.convert(message));
+               return reuse;
+       }
+
+       @Override
+       public boolean isEndOfStream(RowData nextElement) {
+               return false;
+       }
+
+       @Override
+       public TypeInformation<RowData> getProducedType() {
+               return producedTypeInfo;
+       }
+
+       @Override
+       public boolean equals(Object o) {
+               if (this == o) {
+                       return true;
+               }
+               if (o == null || getClass() != o.getClass()) {
+                       return false;
+               }
+               RawFormatDeserializationSchema that = 
(RawFormatDeserializationSchema) o;
+               return producedTypeInfo.equals(that.producedTypeInfo) &&
+                       deserializedType.equals(that.deserializedType) &&
+                       charsetName.equals(that.charsetName) &&
+                       isBigEndian == that.isBigEndian;
+       }
+
+       @Override
+       public int hashCode() {
+               return Objects.hash(producedTypeInfo, deserializedType, 
charsetName, isBigEndian);
+       }
+
+       // 
------------------------------------------------------------------------
+
+       /**
+        * Runtime converter that convert byte[] to internal data structure 
object.
+        */
+       @FunctionalInterface
+       private interface DeserializationRuntimeConverter extends Serializable {
+
+               default void open() {}
+
+               Object convert(byte[] data) throws IOException;
+       }
+
+       /**
+        * Creates a runtime converter.
+        */
+       private static DeserializationRuntimeConverter createConverter(
+                       LogicalType type, String charsetName, boolean 
isBigEndian) {
+               final DeserializationRuntimeConverter converter = 
createNotNullConverter(type, charsetName, isBigEndian);
+               final Consumer<byte[]> validator = 
createDataLengthValidator(type);
+
+               return new DeserializationRuntimeConverter() {
+                       private static final long serialVersionUID = 1L;
+
+                       @Override
+                       public void open() {
+                               converter.open();
+                       }
+
+                       @Override
+                       public Object convert(byte[] data) throws IOException {
+                               if (data == null) {

Review comment:
       I think data is never null at this location.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to