This is an automated email from the ASF dual-hosted git repository.
jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 34dd47e [FLINK-12583][python] Add all format support align with the
Java Table API.
34dd47e is described below
commit 34dd47e64c21397a157ca3798acd159a99845243
Author: Wei Zhong <[email protected]>
AuthorDate: Thu May 30 18:20:58 2019 +0800
[FLINK-12583][python] Add all format support align with the Java Table API.
This closes #8575
---
.../main/flink-bin/bin/pyflink-gateway-server.sh | 13 +-
flink-python/dev/lint-python.sh | 19 ++
flink-python/pyflink/table/__init__.py | 6 +-
flink-python/pyflink/table/table_descriptor.py | 242 +++++++++++++++++++-
.../pyflink/table/tests/test_descriptor.py | 249 ++++++++++++++++++++-
tools/travis_controller.sh | 3 +
6 files changed, 525 insertions(+), 7 deletions(-)
diff --git a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
index 078c757..5fff644 100644
--- a/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
+++ b/flink-dist/src/main/flink-bin/bin/pyflink-gateway-server.sh
@@ -58,13 +58,24 @@ if [[ -n "$FLINK_TESTING" ]]; then
bin=`dirname "$0"`
FLINK_SOURCE_ROOT_DIR=`cd "$bin/../../"; pwd -P`
+ FIND_EXPRESSION=""
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-csv/target/flink-csv*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-avro/target/flink-avro*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-avro/target/avro*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-formats/flink-json/target/flink-json*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-elasticsearch-base/target/flink*.jar"
+ FIND_EXPRESSION="$FIND_EXPRESSION -o -path
${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar"
+
+ # disable the wildcard expansion for the moment.
+ set -f
while read -d '' -r testJarFile ; do
if [[ "$FLINK_TEST_CLASSPATH" == "" ]]; then
FLINK_TEST_CLASSPATH="$testJarFile";
else
FLINK_TEST_CLASSPATH="$FLINK_TEST_CLASSPATH":"$testJarFile"
fi
- done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name
'flink-*-tests.jar' -o -path
"${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-elasticsearch-base/target/flink*.jar"
-o -path
"${FLINK_SOURCE_ROOT_DIR}/flink-connectors/flink-connector-kafka-base/target/flink*.jar"
\) -print0 | sort -z)
+ done < <(find "$FLINK_SOURCE_ROOT_DIR" ! -type d \( -name
'flink-*-tests.jar'${FIND_EXPRESSION} \) -print0 | sort -z)
+ set +f
fi
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -cp
${FLINK_CLASSPATH}:${TABLE_JAR_PATH}:${PYTHON_JAR_PATH}:${FLINK_TEST_CLASSPATH}
${DRIVER} ${ARGS[@]}
diff --git a/flink-python/dev/lint-python.sh b/flink-python/dev/lint-python.sh
index b6db4be..23b1b6f 100755
--- a/flink-python/dev/lint-python.sh
+++ b/flink-python/dev/lint-python.sh
@@ -411,6 +411,23 @@ function check_stage() {
echo "All the checks are finished, the detailed information can be found
in: $LOG_FILE"
}
+function download_java_dependencies() {
+ INITIAL_DIR=`pwd`
+ cd "$CURRENT_DIR/../../"
+ AVRO_VERSION=`mvn help:evaluate -Dexpression=avro.version | grep
--invert-match -E '\[|Download*'`
+ if [ ! -f `pwd`/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar
]; then
+ echo "Downloading avro-$AVRO_VERSION.jar..."
+ mvn org.apache.maven.plugins:maven-dependency-plugin:2.10:copy
-Dartifact=org.apache.avro:avro:"$AVRO_VERSION":jar
-DoutputDirectory=`pwd`/flink-formats/flink-avro/target
+ if [ ! -f
`pwd`/flink-formats/flink-avro/target/avro-"$AVRO_VERSION".jar ]; then
+ echo "Download avro-$AVRO_VERSION.jar failed."
+ else
+ echo "avro-$AVRO_VERSION.jar downloaded."
+ fi
+ else
+ echo "avro-$AVRO_VERSION.jar already exists, no need to download."
+ fi
+ cd "$INITIAL_DIR"
+}
###############################################################All Checks
Definitions###############################################################
#########################
@@ -581,5 +598,7 @@ fi
# install environment
install_environment
+download_java_dependencies
+
# exec all selected checks
check_stage
diff --git a/flink-python/pyflink/table/__init__.py
b/flink-python/pyflink/table/__init__.py
index 946f00c..3b74ee2 100644
--- a/flink-python/pyflink/table/__init__.py
+++ b/flink-python/pyflink/table/__init__.py
@@ -40,7 +40,8 @@ from pyflink.table.table_sink import TableSink, CsvTableSink
from pyflink.table.table_source import TableSource, CsvTableSource
from pyflink.table.types import DataTypes, UserDefinedType, Row
from pyflink.table.window import Tumble, Session, Slide, Over
-from pyflink.table.table_descriptor import Rowtime, Schema, OldCsv,
FileSystem, Kafka, Elasticsearch
+from pyflink.table.table_descriptor import (Rowtime, Schema, OldCsv, Csv,
Avro, Json, FileSystem,
+ Kafka, Elasticsearch)
from pyflink.table.table_schema import TableSchema
__all__ = [
@@ -61,6 +62,9 @@ __all__ = [
'Rowtime',
'Schema',
'OldCsv',
+ 'Csv',
+ 'Avro',
+ 'Json',
'FileSystem',
'UserDefinedType',
'Row',
diff --git a/flink-python/pyflink/table/table_descriptor.py
b/flink-python/pyflink/table/table_descriptor.py
index 8c6401b..9570a31 100644
--- a/flink-python/pyflink/table/table_descriptor.py
+++ b/flink-python/pyflink/table/table_descriptor.py
@@ -32,7 +32,11 @@ __all__ = [
'OldCsv',
'FileSystem',
'Kafka',
- 'Elasticsearch'
+ 'Elasticsearch',
+ 'Csv',
+ 'Avro',
+ 'Json',
+ 'FileSystem'
]
@@ -103,8 +107,8 @@ class Rowtime(Descriptor):
"""
Sets a custom timestamp extractor to be used for the rowtime attribute.
- :param extractor: The java canonical class name of the
TimestampExtractor to extract the
- rowtime attribute from the physical type. The
TimestampExtractor must
+ :param extractor: The java fully-qualified class name of the
TimestampExtractor to extract
+ the rowtime attribute from the physical type. The
TimestampExtractor must
have a public no-argument constructor and can be
founded by
in current Java classloader.
:return: This rowtime descriptor.
@@ -154,7 +158,7 @@ class Rowtime(Descriptor):
"""
Sets a custom watermark strategy to be used for the rowtime attribute.
- :param strategy: The java canonical class name of the
WatermarkStrategy. The
+ :param strategy: The java fully-qualified class name of the
WatermarkStrategy. The
WatermarkStrategy must have a public no-argument
constructor and can be
founded by in current Java classloader.
:return: This rowtime descriptor.
@@ -368,6 +372,236 @@ class OldCsv(FormatDescriptor):
return self
+class Csv(FormatDescriptor):
+ """
+ Format descriptor for comma-separated values (CSV).
+
+ This descriptor aims to comply with RFC-4180 ("Common Format and MIME Type
for
+ Comma-Separated Values (CSV) Files") proposed by the Internet Engineering
Task Force (IETF).
+
+ ..note::
+ This descriptor does not describe Flink's old non-standard CSV table
+ source/sink. Currently, this descriptor can be used when writing to
Kafka. The old one is
+ still available as :class:`OldCsv` for stream/batch filesystem
operations.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_csv = gateway.jvm.Csv()
+ super(Csv, self).__init__(self._j_csv)
+
+ def field_delimiter(self, delimiter):
+ """
+ Sets the field delimiter character (',' by default).
+
+ :param delimiter: The field delimiter character.
+ :return: This :class:`Csv` object.
+ """
+ if not isinstance(delimiter, (str, unicode)) or len(delimiter) != 1:
+ raise TypeError("Only one-character string is supported!")
+ self._j_csv = self._j_csv.fieldDelimiter(delimiter)
+ return self
+
+ def line_delimiter(self, delimiter):
+ """
+ Sets the line delimiter ("\n" by default; otherwise "\r" or "\r\n" are
allowed).
+
+ :param delimiter: The line delimiter.
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.lineDelimiter(delimiter)
+ return self
+
+ def quote_character(self, quote_character):
+ """
+ Sets the field delimiter character (',' by default).
+
+ :param quote_character: The quote character.
+ :return: This :class:`Csv` object.
+ """
+ if not isinstance(quote_character, (str, unicode)) or
len(quote_character) != 1:
+ raise TypeError("Only one-character string is supported!")
+ self._j_csv = self._j_csv.quoteCharacter(quote_character)
+ return self
+
+ def allow_comments(self):
+ """
+ Ignores comment lines that start with '#' (disabled by default). If
enabled, make sure to
+ also ignore parse errors to allow empty rows.
+
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.allowComments()
+ return self
+
+ def ignore_parse_errors(self):
+ """
+ Skip records with parse error instead to fail. Throw an exception by
default.
+
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.ignoreParseErrors()
+ return self
+
+ def array_element_delimiter(self, delimiter):
+ """
+ Sets the array element delimiter string for separating array or row
element
+ values (";" by default).
+
+ :param delimiter: The array element delimiter.
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.arrayElementDelimiter(delimiter)
+ return self
+
+ def escape_character(self, escape_character):
+ """
+ Sets the escape character for escaping values (disabled by default).
+
+ :param escape_character: Escaping character (e.g. backslash).
+ :return: This :class:`Csv` object.
+ """
+ if not isinstance(escape_character, (str, unicode)) or
len(escape_character) != 1:
+ raise TypeError("Only one-character string is supported!")
+ self._j_csv = self._j_csv.escapeCharacter(escape_character)
+ return self
+
+ def null_literal(self, null_literal):
+ """
+ Sets the null literal string that is interpreted as a null value
(disabled by default).
+
+ :param null_literal:
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.nullLiteral(null_literal)
+ return self
+
+ def schema(self, schema_data_type):
+ """
+ Sets the format schema with field names and the types. Required if
schema is not derived.
+
+ :param schema_data_type: Data type from :class:`DataTypes` that
describes the schema.
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.schema(_to_java_type(schema_data_type))
+ return self
+
+ def derive_schema(self):
+ """
+ Derives the format schema from the table's schema. Required if no
format schema is defined.
+
+ This allows for defining schema information only once.
+
+ The names, types, and fields' order of the format are determined by
the table's
+ schema. Time attributes are ignored if their origin is not a field. A
"from" definition
+ is interpreted as a field renaming in the format.
+
+ :return: This :class:`Csv` object.
+ """
+ self._j_csv = self._j_csv.deriveSchema()
+ return self
+
+
+class Avro(FormatDescriptor):
+ """
+ Format descriptor for Apache Avro records.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_avro = gateway.jvm.Avro()
+ super(Avro, self).__init__(self._j_avro)
+
+ def record_class(self, record_class):
+ """
+ Sets the class of the Avro specific record.
+
+ :param record_class: The java fully-qualified class name of the Avro
record.
+ :return: This object.
+ """
+ gateway = get_gateway()
+ clz =
gateway.jvm.Thread.currentThread().getContextClassLoader().loadClass(record_class)
+ self._j_avro = self._j_avro.recordClass(clz)
+ return self
+
+ def avro_schema(self, avro_schema):
+ """
+ Sets the Avro schema for specific or generic Avro records.
+
+ :param avro_schema: Avro schema string.
+ :return: This object.
+ """
+ self._j_avro = self._j_avro.avroSchema(avro_schema)
+ return self
+
+
+class Json(FormatDescriptor):
+ """
+ Format descriptor for JSON.
+ """
+
+ def __init__(self):
+ gateway = get_gateway()
+ self._j_json = gateway.jvm.Json()
+ super(Json, self).__init__(self._j_json)
+
+ def fail_on_missing_field(self, fail_on_missing_field):
+ """
+ Sets flag whether to fail if a field is missing or not.
+
+ :param fail_on_missing_field: If set to ``True``, the operation fails
if there is a missing
+ field.
+ If set to ``False``, a missing field is
set to null.
+ :return: This object.
+ """
+ if not isinstance(fail_on_missing_field, bool):
+ raise TypeError("Only bool value is supported!")
+ self._j_json = self._j_json.failOnMissingField(fail_on_missing_field)
+ return self
+
+ def json_schema(self, json_schema):
+ """
+ Sets the JSON schema string with field names and the types according
to the JSON schema
+ specification: http://json-schema.org/specification.html
+
+ The schema might be nested.
+
+ :param json_schema: The JSON schema string.
+ :return: This object.
+ """
+ self._j_json = self._j_json.jsonSchema(json_schema)
+ return self
+
+ def schema(self, schema_data_type):
+ """
+ Sets the schema using :class:`DataTypes`.
+
+ JSON objects are represented as ROW types.
+
+ The schema might be nested.
+
+ :param schema_data_type: Data type that describes the schema.
+ :return: This object.
+ """
+ self._j_json = self._j_json.schema(_to_java_type(schema_data_type))
+ return self
+
+ def derive_schema(self):
+ """
+ Derives the format schema from the table's schema described.
+
+ This allows for defining schema information only once.
+
+ The names, types, and fields' order of the format are determined by
the table's
+ schema. Time attributes are ignored if their origin is not a field. A
"from" definition
+ is interpreted as a field renaming in the format.
+
+ :return: This object.
+ """
+ self._j_json = self._j_json.deriveSchema()
+ return self
+
+
class ConnectorDescriptor(Descriptor):
"""
Describes a connector to an other system.
diff --git a/flink-python/pyflink/table/tests/test_descriptor.py
b/flink-python/pyflink/table/tests/test_descriptor.py
index 5f1262a..31e133b 100644
--- a/flink-python/pyflink/table/tests/test_descriptor.py
+++ b/flink-python/pyflink/table/tests/test_descriptor.py
@@ -18,7 +18,7 @@
import os
from pyflink.table.table_descriptor import (FileSystem, OldCsv, Rowtime,
Schema, Kafka,
- Elasticsearch)
+ Elasticsearch, Csv, Avro, Json)
from pyflink.table.table_schema import TableSchema
from pyflink.table.table_sink import CsvTableSink
from pyflink.table.types import DataTypes
@@ -524,6 +524,253 @@ class OldCsvDescriptorTests(PyFlinkTestCase):
assert properties == expected
+class CsvDescriptorTests(PyFlinkTestCase):
+
+ def test_field_delimiter(self):
+ csv = Csv()
+
+ csv = csv.field_delimiter("|")
+
+ properties = csv.to_properties()
+ expected = {'format.field-delimiter': '|',
+ 'format.type': 'csv',
+ 'format.property-version': '1'}
+ assert properties == expected
+
+ def test_line_delimiter(self):
+ csv = Csv()
+
+ csv = csv.line_delimiter(";")
+
+ expected = {'format.line-delimiter': ';',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_quote_character(self):
+ csv = Csv()
+
+ csv = csv.quote_character("'")
+
+ expected = {'format.quote-character': "'",
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_allow_comments(self):
+ csv = Csv()
+
+ csv = csv.allow_comments()
+
+ expected = {'format.allow-comments': 'true',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_ignore_parse_errors(self):
+ csv = Csv()
+
+ csv = csv.ignore_parse_errors()
+
+ expected = {'format.ignore-parse-errors': 'true',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_array_element_delimiter(self):
+ csv = Csv()
+
+ csv = csv.array_element_delimiter("/")
+
+ expected = {'format.array-element-delimiter': '/',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_escape_character(self):
+ csv = Csv()
+
+ csv = csv.escape_character("\\")
+
+ expected = {'format.escape-character': '\\',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_null_literal(self):
+ csv = Csv()
+
+ csv = csv.null_literal("null")
+
+ expected = {'format.null-literal': 'null',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_schema(self):
+ csv = Csv()
+
+ csv = csv.schema(DataTypes.ROW([DataTypes.FIELD("a", DataTypes.INT()),
+ DataTypes.FIELD("b",
DataTypes.STRING())]))
+
+ expected = {'format.schema': 'ROW<a INT, b VARCHAR>',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+ def test_derive_schema(self):
+ csv = Csv()
+
+ csv = csv.derive_schema()
+
+ expected = {'format.derive-schema': 'true',
+ 'format.property-version': '1',
+ 'format.type': 'csv'}
+
+ properties = csv.to_properties()
+ assert properties == expected
+
+
+class AvroDescriptorTest(PyFlinkTestCase):
+
+ def test_record_class(self):
+ avro = Avro()
+
+ avro =
avro.record_class("org.apache.flink.formats.avro.generated.Address")
+
+ expected = {'format.record-class':
'org.apache.flink.formats.avro.generated.Address',
+ 'format.property-version': '1',
+ 'format.type': 'avro'}
+
+ properties = avro.to_properties()
+ assert properties == expected
+
+ def test_avro_schema(self):
+ avro = Avro()
+
+ avro = avro.avro_schema('{"type":"record",'
+ '"name":"Address",'
+
'"namespace":"org.apache.flink.formats.avro.generated",'
+ '"fields":['
+ '{"name":"num","type":"int"},'
+ '{"name":"street","type":"string"},'
+ '{"name":"city","type":"string"},'
+ '{"name":"state","type":"string"},'
+ '{"name":"zip","type":"string"}'
+ ']}')
+
+ expected = {'format.avro-schema': '{"type":"record",'
+ '"name":"Address",'
+
'"namespace":"org.apache.flink.formats.avro.generated",'
+ '"fields":['
+ '{"name":"num","type":"int"},'
+ '{"name":"street","type":"string"},'
+ '{"name":"city","type":"string"},'
+ '{"name":"state","type":"string"},'
+ '{"name":"zip","type":"string"}'
+ ']}',
+ 'format.property-version': '1',
+ 'format.type': 'avro'}
+
+ properties = avro.to_properties()
+ assert properties == expected
+
+
+class JsonDescriptorTests(PyFlinkTestCase):
+
+ def test_fail_on_missing_field_true(self):
+ json = Json()
+
+ json = json.fail_on_missing_field(True)
+
+ expected = {'format.fail-on-missing-field': 'true',
+ 'format.property-version': '1',
+ 'format.type': 'json'}
+
+ properties = json.to_properties()
+ assert properties == expected
+
+ def test_json_schema(self):
+ json = Json()
+
+ json = json.json_schema("{"
+ "'title': 'Fruit',"
+ "'type': 'object',"
+ "'properties': "
+ "{"
+ "'name': {'type': 'string'},"
+ "'count': {'type': 'integer'},"
+ "'time': "
+ "{"
+ "'description': 'row time',"
+ "'type': 'string',"
+ "'format': 'date-time'"
+ "}"
+ "},"
+ "'required': ['name', 'count', 'time']"
+ "}")
+
+ expected = {'format.json-schema':
+ "{"
+ "'title': 'Fruit',"
+ "'type': 'object',"
+ "'properties': {"
+ "'name': {'type': 'string'},"
+ "'count': {'type': 'integer'},"
+ "'time': {"
+ "'description': 'row time',"
+ "'type': 'string',"
+ "'format': 'date-time'}"
+ "},"
+ "'required': ['name', 'count', 'time']}",
+ 'format.property-version': '1',
+ 'format.type': 'json'}
+
+ properties = json.to_properties()
+ assert properties == expected
+
+ def test_schema(self):
+ json = Json()
+
+ json = json.schema(DataTypes.ROW([DataTypes.FIELD("a",
DataTypes.INT()),
+ DataTypes.FIELD("b",
DataTypes.STRING())]))
+
+ expected = {'format.schema': 'ROW<a INT, b VARCHAR>',
+ 'format.property-version': '1',
+ 'format.type': 'json'}
+
+ properties = json.to_properties()
+ assert properties == expected
+
+ def test_derive_schema(self):
+ json = Json()
+
+ json = json.derive_schema()
+
+ expected = {'format.derive-schema': 'true',
+ 'format.property-version': '1',
+ 'format.type': 'json'}
+
+ properties = json.to_properties()
+ assert properties == expected
+
+
class RowTimeDescriptorTests(PyFlinkTestCase):
def test_timestamps_from_field(self):
diff --git a/tools/travis_controller.sh b/tools/travis_controller.sh
index c13d726..80568f1 100755
--- a/tools/travis_controller.sh
+++ b/tools/travis_controller.sh
@@ -159,6 +159,9 @@ if [ $STAGE == "$STAGE_COMPILE" ]; then
# jars are re-built in subsequent stages, so no need to cache them
(cannot be avoided)
find "$CACHE_FLINK_DIR" -maxdepth 8 -type f -name '*.jar' \
+ ! -path
"$CACHE_FLINK_DIR/flink-formats/flink-csv/target/flink-csv*.jar" \
+ ! -path
"$CACHE_FLINK_DIR/flink-formats/flink-json/target/flink-json*.jar" \
+ ! -path
"$CACHE_FLINK_DIR/flink-formats/flink-avro/target/flink-avro*.jar" \
! -path
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/lib/flink-dist*.jar" \
! -path
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-table*.jar" \
! -path
"$CACHE_FLINK_DIR/flink-dist/target/flink-*-bin/flink-*/opt/flink-python*java-binding.jar"
\