twalthr commented on a change in pull request #13909:
URL: https://github.com/apache/flink/pull/13909#discussion_r518007361
##########
File path: docs/dev/table/connectors/formats/index.md
##########
@@ -68,5 +68,10 @@ Flink supports the following formats:
<td><a href="{% link dev/table/connectors/formats/orc.md %}">Apache
ORC</a></td>
<td><a href="{% link dev/table/connectors/filesystem.md
%}">Filesystem</a></td>
</tr>
+ <tr>
+ <td><a href="{% link dev/table/connectors/formats/singleValue.md
%}">Single Value</a></td>
Review comment:
According to the offline discussion, call the format `raw` instead.
##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains
only a single field, and that field is not wrapped within a JSON object, or an
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null`
values as a tombstone message (DELETE on the key). Therefore, we recommend
avoiding using `upsert-kafka` connector and `single-field` format if the field
can have a `null` value.
Review comment:
```
we recommend avoiding using `upsert-kafka` connector and the `raw` format as
a `value.format`
```
We should emphasize that a key format is fine.
##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains
only a single field, and that field is not wrapped within a JSON object, or an
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null`
values as a tombstone message (DELETE on the key). Therefore, we recommend
avoiding using `upsert-kafka` connector and `single-field` format if the field
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316
"https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the
underlying Kafka topic as an anonymous string value by using `single-field`
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'nginx_log',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into
multiple fields using user-defined-function for further analysing, e.g.
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+ SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what format to use, here should be 'single-field'.</td>
+ </tr>
+ </tbody>
+</table>
+
+Data Type Mapping
+----------------
+
+The table below details the SQL types the format supports, including details
of the serializer and deserializer class for encoding and decoding.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Flink SQL type</th>
+ <th class="text-left">Value</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>CHAR / VARCHAR / STRING</code></td>
+ <td>A UTF-8 encoded text string</td>
+ </tr>
+ <tr>
+ <td><code>BOOLEAN</code></td>
+ <td>A single byte to indicate boolean value, 0 means false, 1 means
true.</td>
+ </tr>
+ <tr>
+ <td><code>TINYINT</code></td>
+ <td>A 8-bit signed number</td>
+ </tr>
+ <tr>
+ <td><code>SMALLINT</code></td>
+ <td>A 16-bit signed number</td>
+ </tr>
+ <tr>
+ <td><code>INT</code></td>
+ <td>A 32-bit signed integer</td>
+ </tr>
+ <tr>
+ <td><code>BIGINT</code></td>
+ <td>A 64-bit signed integer</td>
+ </tr>
+ <tr>
+ <td><code>FLOAT</code></td>
+ <td>A 32-bit floating point number</td>
+ </tr>
+ <tr>
Review comment:
let's also support BINARY/VARBINARY/BYTES and the RAW type itself to
finalize this story
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an {@link RowData} object into a
single field bytes.
+ */
+@Internal
+public class SingleFieldSerializationSchema implements
SerializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LogicalType fieldType;
+
+ private final SerializationRuntimeConverter converter;
+
+ private final FieldGetter fieldGetter;
+
+ public SingleFieldSerializationSchema(LogicalType fieldType) {
+ this.fieldType = fieldType;
+ this.fieldGetter = RowData.createFieldGetter(fieldType, 0);
+ this.converter = createConverter(fieldType);
+ }
+
+ @Override
+ public byte[] serialize(RowData row) {
+ try {
+ return
converter.convert(fieldGetter.getFieldOrNull(row));
+ } catch (IOException e) {
+ throw new RuntimeException("Could not serialize row '"
+ row + "'. ", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SingleFieldSerializationSchema that =
(SingleFieldSerializationSchema) o;
+ return fieldType.equals(that.fieldType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fieldType);
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that convert an object of internal data structure
to byte[].
+ */
+ @FunctionalInterface
+ private interface SerializationRuntimeConverter extends Serializable {
+ byte[] convert(Object value) throws IOException;
+ }
+
+ /**
+ * Creates a runtime converter.
+ */
+ private SerializationRuntimeConverter createConverter(LogicalType type)
{
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return value -> {
+ // put null check logic in the lambda
instead of wrapping outside
+ // to avoid virtual method invoking.
+ if (value == null) {
+ return null;
+ }
+ return ((StringData) value).toBytes();
+ };
+ case VARBINARY:
+ case BINARY:
+ return value -> {
+ if (value == null) {
+ return null;
+ }
+ return (byte[]) value;
+ };
+ case TINYINT:
+ return
createConverterUsingSerializer(ByteSerializer.INSTANCE);
+ case SMALLINT:
+ return
createConverterUsingSerializer(ShortSerializer.INSTANCE);
+ case INTEGER:
+ return
createConverterUsingSerializer(IntSerializer.INSTANCE);
+ case BIGINT:
+ return
createConverterUsingSerializer(LongSerializer.INSTANCE);
+ case FLOAT:
+ return
createConverterUsingSerializer(FloatSerializer.INSTANCE);
+ case DOUBLE:
+ return
createConverterUsingSerializer(DoubleSerializer.INSTANCE);
+ case BOOLEAN:
+ return
createConverterUsingSerializer(BooleanSerializer.INSTANCE);
+ default:
+ throw new
UnsupportedOperationException("'single-format' currently doesn't support type:
" + type);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private static SerializationRuntimeConverter
createConverterUsingSerializer(
+ TypeSerializer<?> serializer) {
+ return new
DelegatingSerializationConverter((TypeSerializer<Object>) serializer);
+ }
+
+ private static final class DelegatingSerializationConverter
+ implements SerializationRuntimeConverter {
+ private static final long serialVersionUID = 1L;
+ private final DataOutputSerializer dos = new
DataOutputSerializer(16);
+ private final TypeSerializer<Object> delegatingSerializer;
+
+ protected
DelegatingSerializationConverter(TypeSerializer<Object> delegatingSerializer) {
+ this.delegatingSerializer = delegatingSerializer;
+ }
+
+ @Override
+ public byte[] convert(Object value) throws IOException {
+ if (value == null) {
+ return null;
+ }
+ delegatingSerializer.serialize(value, dos);
+ byte[] ret = dos.getCopyOfBuffer();
Review comment:
Same comment as before, this looks overly complicated to just convert a
a couple of data types to bytes. We should think about doing it manually.
##########
File path:
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/formats/singlefield/SingleFieldSerDeSchemaTest.java
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.conversion.DataStructureConverter;
+import org.apache.flink.table.data.conversion.DataStructureConverters;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.Row;
+
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static org.apache.flink.table.api.DataTypes.BIGINT;
+import static org.apache.flink.table.api.DataTypes.BINARY;
+import static org.apache.flink.table.api.DataTypes.BOOLEAN;
+import static org.apache.flink.table.api.DataTypes.BYTES;
+import static org.apache.flink.table.api.DataTypes.DOUBLE;
+import static org.apache.flink.table.api.DataTypes.FIELD;
+import static org.apache.flink.table.api.DataTypes.FLOAT;
+import static org.apache.flink.table.api.DataTypes.INT;
+import static org.apache.flink.table.api.DataTypes.ROW;
+import static org.apache.flink.table.api.DataTypes.SMALLINT;
+import static org.apache.flink.table.api.DataTypes.STRING;
+import static org.apache.flink.table.api.DataTypes.TINYINT;
+import static org.apache.flink.table.api.DataTypes.VARCHAR;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link SingleFieldDeserializationSchema} {@link
SingleFieldSerializationSchema}.
+ */
+public class SingleFieldSerDeSchemaTest {
+
+ @Test
+ public void testSerializationAndDeserialization() throws Exception {
+ for (TestSpec testSpec : testData) {
+ runTest(testSpec);
+ }
+ }
+
+ private void runTest(TestSpec testSpec) throws Exception {
+ SingleFieldDeserializationSchema deserializationSchema = new
SingleFieldDeserializationSchema(
+ testSpec.type.getLogicalType(),
TypeInformation.of(RowData.class));
+ SingleFieldSerializationSchema serializationSchema = new
SingleFieldSerializationSchema(
+ testSpec.type.getLogicalType());
+
deserializationSchema.open(mock(DeserializationSchema.InitializationContext.class));
+
serializationSchema.open(mock(SerializationSchema.InitializationContext.class));
+
+ Row row = Row.of(testSpec.value);
+ DataStructureConverter<Object, Object> converter =
DataStructureConverters.getConverter(
+ ROW(FIELD("single", testSpec.type)));
+ RowData originalRowData = (RowData) converter.toInternal(row);
+
+ byte[] serializedBytes =
serializationSchema.serialize(originalRowData);
+ RowData deserializeRowData =
deserializationSchema.deserialize(serializedBytes);
+
+ Row actual = (Row) converter.toExternal(deserializeRowData);
+ assertEquals(row, actual);
+ }
+
+ private static List<TestSpec> testData = Arrays.asList(
Review comment:
Use a JUnit `@Parameterized` instead?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LogicalType fieldType;
+ private final DeserializationRuntimeConverter converter;
+ private TypeInformation<RowData> typeInfo;
Review comment:
nit: call this`producedTypeInfo` for consistency
final?
##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains
only a single field, and that field is not wrapped within a JSON object, or an
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null`
values as a tombstone message (DELETE on the key). Therefore, we recommend
avoiding using `upsert-kafka` connector and `single-field` format if the field
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316
"https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the
underlying Kafka topic as an anonymous string value by using `single-field`
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'nginx_log',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into
multiple fields using user-defined-function for further analysing, e.g.
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+ SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>format</h5></td>
+ <td>required</td>
+ <td style="word-wrap: break-word;">(none)</td>
+ <td>String</td>
+ <td>Specify what format to use, here should be 'single-field'.</td>
+ </tr>
+ </tbody>
+</table>
+
+Data Type Mapping
+----------------
+
+The table below details the SQL types the format supports, including details
of the serializer and deserializer class for encoding and decoding.
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left">Flink SQL type</th>
+ <th class="text-left">Value</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><code>CHAR / VARCHAR / STRING</code></td>
+ <td>A UTF-8 encoded text string</td>
Review comment:
We should also allow to specify a different charset. This should be
configurable.
##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains
only a single field, and that field is not wrapped within a JSON object, or an
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null`
values as a tombstone message (DELETE on the key). Therefore, we recommend
avoiding using `upsert-kafka` connector and `single-field` format if the field
can have a `null` value.
+
+Example
+----------------
+
+For example, you may have following raw log data in Kafka and want to read and
analyse such data using Flink SQL.
+
+```
+47.29.201.179 - - [28/Feb/2019:13:17:10 +0000] "GET /?p=1 HTTP/2.0" 200 5316
"https://domain.com/?p=1" "Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36
(KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36" "2.75"
+```
+
+The following creates a table where it reads from (and writes to) the
underlying Kafka topic as an anonymous string value by using `single-field`
format:
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+CREATE TABLE nginx_log (
+ log STRING
+) WITH (
+ 'connector' = 'kafka',
+ 'topic' = 'nginx_log',
+ 'properties.bootstrap.servers' = 'localhost:9092',
+ 'properties.group.id' = 'testGroup',
+ 'format' = 'single-field'
+)
+{% endhighlight %}
+</div>
+</div>
+
+Then you can read out the raw data as a pure string, and split it into
multiple fields using user-defined-function for further analysing, e.g.
`my_split` in the example.
+
+<div class="codetabs" markdown="1">
+<div data-lang="SQL" markdown="1">
+{% highlight sql %}
+SELECT t.hostname, t.datetime, t.url, t.browser, ...
+FROM(
+ SELECT my_split(log) as t FROM nginx_log
+);
+{% endhighlight %}
+</div>
+</div>
+
+In contrast, you can also write a single field of STRING type into Kafka topic
as an anonymous string value.
+
+Format Options
+----------------
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 25%">Option</th>
+ <th class="text-center" style="width: 8%">Required</th>
+ <th class="text-center" style="width: 7%">Default</th>
+ <th class="text-center" style="width: 10%">Type</th>
+ <th class="text-center" style="width: 50%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>format</h5></td>
Review comment:
We should also allow to configure the endianness. Every format that
encodes to bytes requires this information.
##########
File path: docs/dev/table/connectors/formats/single-field.md
##########
@@ -0,0 +1,151 @@
+---
+title: "Single Field Format"
+nav-title: SingleField
+nav-parent_id: sql-formats
+nav-pos: 7
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+<span class="label label-info">Format: Serialization Schema</span>
+<span class="label label-info">Format: Deserialization Schema</span>
+
+* This will be replaced by the TOC
+{:toc}
+
+The SingleField format allows to read and write data where the data contains
only a single field, and that field is not wrapped within a JSON object, or an
Avro record.
+
+Currently, the SingleField format supports `String`, `byte[]` and primitive
type.
+
+Note: this format encodes `null` values as `null` `byte[]`. This may have
limitation when used in `upsert-kafka`, because `upsert-kafka` treats `null`
values as a tombstone message (DELETE on the key). Therefore, we recommend
avoiding using `upsert-kafka` connector and `single-field` format if the field
can have a `null` value.
Review comment:
I don't understand this comment:
```
encodes `null` values as `null` `byte[]`
```
Do you mean:
```
encodes `null` values as `null` of `byte[]` type
```
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LogicalType fieldType;
Review comment:
nit: call this `deserializedType`?
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldDeserializationSchema.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Deserialization schema from single field data to Flink Table/SQL internal
data structure {@link RowData}.
+ */
+@Internal
+public class SingleFieldDeserializationSchema implements
DeserializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LogicalType fieldType;
+ private final DeserializationRuntimeConverter converter;
+ private TypeInformation<RowData> typeInfo;
+
+ private transient GenericRowData reuse;
+
+ public SingleFieldDeserializationSchema(
+ LogicalType fieldType,
+ TypeInformation<RowData> resultTypeInfo) {
+ this.fieldType = checkNotNull(fieldType);
+ this.typeInfo = checkNotNull(resultTypeInfo);
+ this.converter = createConverter(fieldType);
+ }
+
+ @Override
+ public void open(InitializationContext context) throws Exception {
+ reuse = new GenericRowData(1);
+ }
+
+ @Override
+ public RowData deserialize(byte[] message) throws IOException {
+ reuse.setField(0, converter.convert(message));
+ return reuse;
+ }
+
+ @Override
+ public boolean isEndOfStream(RowData nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation<RowData> getProducedType() {
+ return typeInfo;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SingleFieldDeserializationSchema that =
(SingleFieldDeserializationSchema) o;
+ return typeInfo.equals(that.typeInfo) &&
fieldType.equals(that.fieldType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(typeInfo, fieldType);
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that convert byte[] to internal data structure
object.
+ */
+ @FunctionalInterface
+ private interface DeserializationRuntimeConverter extends Serializable {
+ Object convert(byte[] data) throws IOException;
+ }
+
+ /**
+ * Creates a runtime converter.
+ */
+ private static DeserializationRuntimeConverter
createConverter(LogicalType type) {
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return data -> {
+ // put null check logic in the lambda
instead of wrapping outside
+ // to avoid virtual method invoking.
+ if (data == null) {
+ return null;
+ }
+ return StringData.fromBytes(data);
+ };
+ case VARBINARY:
+ case BINARY:
+ return data -> data;
+ case TINYINT:
+ return
createConverterUsingSerializer(ByteSerializer.INSTANCE);
+ case SMALLINT:
+ return
createConverterUsingSerializer(ShortSerializer.INSTANCE);
+ case INTEGER:
+ return
createConverterUsingSerializer(IntSerializer.INSTANCE);
+ case BIGINT:
+ return
createConverterUsingSerializer(LongSerializer.INSTANCE);
+ case FLOAT:
+ return
createConverterUsingSerializer(FloatSerializer.INSTANCE);
+ case DOUBLE:
+ return
createConverterUsingSerializer(DoubleSerializer.INSTANCE);
+ case BOOLEAN:
+ return
createConverterUsingSerializer(BooleanSerializer.INSTANCE);
+ default:
+ throw new
UnsupportedOperationException("'single-format' currently doesn't support type:
" + type);
+ }
+ }
+
+ private static DeserializationRuntimeConverter
createConverterUsingSerializer(
+ final TypeSerializer<?> serializer) {
+ return new DelegatingDeserializationConverter(serializer);
+ }
+
+ private static final class DelegatingDeserializationConverter
+ implements DeserializationRuntimeConverter {
+ private static final long serialVersionUID = 1L;
+ private final DataInputDeserializer source = new
DataInputDeserializer();
+ private final TypeSerializer<?> serializer;
+
+ protected DelegatingDeserializationConverter(TypeSerializer<?>
serializer) {
+ this.serializer = serializer;
+ }
+
+ @Override
+ public Object convert(byte[] data) throws IOException {
+ if (data == null) {
+ return null;
+ }
+ source.setBuffer(data);
Review comment:
Would we have a performance benefit of implementing the deserialization
logic ourselves? instead of setting fields and delegating to other classes.
##########
File path:
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/formats/singlefield/SingleFieldSerializationSchema.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.formats.singlefield;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.typeutils.base.ByteSerializer;
+import org.apache.flink.api.common.typeutils.base.DoubleSerializer;
+import org.apache.flink.api.common.typeutils.base.FloatSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
+import org.apache.flink.api.common.typeutils.base.ShortSerializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.RowData.FieldGetter;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.LogicalType;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Objects;
+
+/**
+ * Serialization schema that serializes an {@link RowData} object into a
single field bytes.
+ */
+@Internal
+public class SingleFieldSerializationSchema implements
SerializationSchema<RowData> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final LogicalType fieldType;
+
+ private final SerializationRuntimeConverter converter;
+
+ private final FieldGetter fieldGetter;
+
+ public SingleFieldSerializationSchema(LogicalType fieldType) {
+ this.fieldType = fieldType;
+ this.fieldGetter = RowData.createFieldGetter(fieldType, 0);
+ this.converter = createConverter(fieldType);
+ }
+
+ @Override
+ public byte[] serialize(RowData row) {
+ try {
+ return
converter.convert(fieldGetter.getFieldOrNull(row));
+ } catch (IOException e) {
+ throw new RuntimeException("Could not serialize row '"
+ row + "'. ", e);
+ }
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SingleFieldSerializationSchema that =
(SingleFieldSerializationSchema) o;
+ return fieldType.equals(that.fieldType);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(fieldType);
+ }
+
+ //
------------------------------------------------------------------------
+
+ /**
+ * Runtime converter that convert an object of internal data structure
to byte[].
+ */
+ @FunctionalInterface
+ private interface SerializationRuntimeConverter extends Serializable {
+ byte[] convert(Object value) throws IOException;
+ }
+
+ /**
+ * Creates a runtime converter.
+ */
+ private SerializationRuntimeConverter createConverter(LogicalType type)
{
+ switch (type.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return value -> {
+ // put null check logic in the lambda
instead of wrapping outside
+ // to avoid virtual method invoking.
+ if (value == null) {
+ return null;
+ }
+ return ((StringData) value).toBytes();
Review comment:
Is this Flink's representation of strings in bytes? I could imagine that
most people assume `"string".getBytes()` semantics. Is the string length
included in the bytes are as well?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]