This is an automated email from the ASF dual-hosted git repository.
amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 1e5e780 [BEAM-8406] Add support for JSON format text tables
new 13858b4 Merge pull request #10217 from milantracy/beam8406
1e5e780 is described below
commit 1e5e780a0107c88704ff2865b60d33d0f5cfeb1a
Author: Jing Chen <[email protected]>
AuthorDate: Tue Nov 26 00:45:49 2019 -0800
[BEAM-8406] Add support for JSON format text tables
---
.../sql/meta/provider/text/TextJsonTable.java | 42 ++++++++++
.../sql/meta/provider/text/TextTableProvider.java | 98 +++++++++++++++++++++-
.../meta/provider/text/TextTableProviderTest.java | 68 +++++++++++++++
3 files changed, 206 insertions(+), 2 deletions(-)
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
new file mode 100644
index 0000000..a59c337
--- /dev/null
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.text;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider.JsonToRow;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider.RowToJson;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * {@link TextJsonTable} is a {@link BeamSqlTable} that reads text files and
converts them according
+ * to the JSON format.
+ *
+ * <p>Support format is {@code "json"}.
+ *
+ * <p>Check {@link ObjectMapper} javadoc for more info on reading and writing
JSON.
+ */
+@Internal
+public class TextJsonTable extends TextTable {
+
+ public TextJsonTable(
+ Schema schema, String filePattern, JsonToRow readConverter, RowToJson
writerConverter) {
+ super(schema, filePattern, readConverter, writerConverter);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 229e7cc..e262bfe 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -19,23 +19,38 @@ package
org.apache.beam.sdk.extensions.sql.meta.provider.text;
import static
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
import static
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+import static org.apache.beam.sdk.util.RowJsonUtils.jsonToRow;
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
import static
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
import java.io.Serializable;
import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
import org.apache.beam.sdk.extensions.sql.meta.Table;
import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
+import
org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.UnsupportedRowJsonException;
import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
import
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.MoreObjects;
@@ -73,11 +88,12 @@ public class TextTableProvider extends
InMemoryMetaTableProvider {
String filePattern = table.getLocation();
JSONObject properties = table.getProperties();
String format = MoreObjects.firstNonNull(properties.getString("format"),
"csv");
+ String deadLetterFile = properties.getString("deadLetterFile");
// Backwards compatibility: previously "type": "text" meant CSV and
"format" was where the
// CSV format went. So assume that any other format is the CSV format.
@Nullable String legacyCsvFormat = null;
- if (!ImmutableSet.of("csv", "lines").contains(format)) {
+ if (!ImmutableSet.of("csv", "lines", "json").contains(format)) {
legacyCsvFormat = format;
format = "csv";
}
@@ -93,6 +109,9 @@ public class TextTableProvider extends
InMemoryMetaTableProvider {
: CSVFormat.DEFAULT);
return new TextTable(
schema, filePattern, new CsvToRow(schema, csvFormat), new
RowToCsv(csvFormat));
+ case "json":
+ return new TextJsonTable(
+ schema, filePattern, JsonToRow.create(schema, deadLetterFile),
RowToJson.create());
case "lines":
checkArgument(
schema.getFieldCount() == 1
@@ -103,7 +122,7 @@ public class TextTableProvider extends
InMemoryMetaTableProvider {
schema, filePattern, new LinesReadConverter(), new
LinesWriteConverter());
default:
throw new IllegalArgumentException(
- "Table with type 'text' must have format 'csv' or 'lines'");
+ "Table with type 'text' must have format 'csv' or 'lines' or
'json'");
}
}
@@ -141,6 +160,81 @@ public class TextTableProvider extends
InMemoryMetaTableProvider {
}
}
+ /** Read-side converter for {@link TextJsonTable} with format {@code
'json'}. */
+ @AutoValue
+ @Internal
+ abstract static class JsonToRow extends PTransform<PCollection<String>,
PCollection<Row>>
+ implements Serializable {
+ protected static final TupleTag<String> DLF_TAG = new TupleTag<>();
+ protected static final TupleTag<Row> MAIN_TAG = new TupleTag<>();
+
+ public abstract Schema schema();
+
+ @Nullable
+ public abstract String deadLetterFile();
+
+ public static JsonToRow create(Schema schema, @Nullable String
deadLetterFile) {
+ return new AutoValue_TextTableProvider_JsonToRow(schema, deadLetterFile);
+ }
+
+ public static JsonToRow create(Schema schema) {
+ return create(schema, null);
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<String> input) {
+ PCollectionTuple rows =
+ input.apply(
+ ParDo.of(
+ new DoFn<String, Row>() {
+ @ProcessElement
+ public void processElement(ProcessContext context) {
+ try {
+ context.output(jsonToRow(getObjectMapper(),
context.element()));
+ } catch (UnsupportedRowJsonException jsonException) {
+ if (deadLetterFile() != null) {
+ context.output(DLF_TAG, context.element());
+ } else {
+ throw new RuntimeException("Error parsing JSON",
jsonException);
+ }
+ }
+ }
+ })
+ .withOutputTags(
+ MAIN_TAG,
+ deadLetterFile() != null ? TupleTagList.of(DLF_TAG) :
TupleTagList.empty()));
+
+ if (deadLetterFile() != null) {
+
rows.get(DLF_TAG).setCoder(StringUtf8Coder.of()).apply(writeJsonToDlf());
+ }
+ return rows.get(MAIN_TAG).setRowSchema(schema());
+ }
+
+ private TextIO.Write writeJsonToDlf() {
+ return TextIO.write().withDelimiter(new char[] {}).to(deadLetterFile());
+ }
+
+ private ObjectMapper getObjectMapper() {
+ return newObjectMapperWith(RowJsonDeserializer.forSchema(schema()));
+ }
+ }
+
+ /** Write-side converter for {@link TextJsonTable} with format {@code
'json'}. */
+ @AutoValue
+ @Internal
+ abstract static class RowToJson extends PTransform<PCollection<Row>,
PCollection<String>>
+ implements Serializable {
+
+ public static RowToJson create() {
+ return new AutoValue_TextTableProvider_RowToJson();
+ }
+
+ @Override
+ public PCollection<String> expand(PCollection<Row> input) {
+ return input.apply(ToJson.of());
+ }
+ }
+
/** Write-side converter for {@link TextTable} with format {@code 'csv'}. */
@VisibleForTesting
static class RowToCsv extends PTransform<PCollection<Row>,
PCollection<String>>
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 172bdb1..1c1b6ba 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -61,6 +61,12 @@ public class TextTableProviderTest {
private static final Schema LINES_SCHEMA =
Schema.builder().addStringField("f_string").build();
private static final String SQL_LINES_SCHEMA = "(f_string VARCHAR)";
+ private static final Schema JSON_SCHEMA =
+ Schema.builder().addStringField("name").addInt32Field("age").build();
+ private static final String SQL_JSON_SCHEMA = "(name VARCHAR, age INTEGER)";
+ private static final String JSON_TEXT = "{\"name\":\"Jack\",\"age\":13}";
+ private static final String INVALID_JSON_TEXT =
"{\"name\":\"Jack\",\"age\":\"thirteen\"}";
+
// Even though these have the same schema as LINES_SCHEMA, that is
accidental; they exist for a
// different purpose, to test Excel CSV format that does not ignore empty
lines
private static final Schema SINGLE_STRING_CSV_SCHEMA =
@@ -210,6 +216,49 @@ public class TextTableProviderTest {
}
@Test
+ public void testJson() throws Exception {
+ Files.write(tempFolder.newFile("test.json").toPath(),
JSON_TEXT.getBytes(Charsets.UTF_8));
+
+ BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*'
TBLPROPERTIES '{\"format\":\"json\"}'",
+ SQL_JSON_SCHEMA, tempFolder.getRoot()));
+
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM
test"));
+
+ PAssert.that(rows)
+ .containsInAnyOrder(Row.withSchema(JSON_SCHEMA).addValues("Jack",
13).build());
+ pipeline.run();
+ }
+
+ @Test
+ public void testInvalidJson() throws Exception {
+ File deadLetterFile = new File(tempFolder.getRoot(), "dead-letter-file");
+ Files.write(
+ tempFolder.newFile("test.json").toPath(),
INVALID_JSON_TEXT.getBytes(Charsets.UTF_8));
+
+ BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' "
+ + "TBLPROPERTIES '{\"format\":\"json\", \"deadLetterFile\":
\"%s\"}'",
+ SQL_JSON_SCHEMA, tempFolder.getRoot(),
deadLetterFile.getAbsoluteFile()));
+
+ PCollection<Row> rows =
+ BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM
test"));
+
+ PAssert.that(rows).empty();
+
+ pipeline.run();
+ assertThat(
+ new NumberedShardedFile(deadLetterFile.getAbsoluteFile() + "*")
+ .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+ containsInAnyOrder(INVALID_JSON_TEXT));
+ }
+
+ @Test
public void testWriteLines() throws Exception {
File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
@@ -248,4 +297,23 @@ public class TextTableProviderTest {
.readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
containsInAnyOrder("hello,42", "goodbye,13"));
}
+
+ @Test
+ public void testWriteJson() throws Exception {
+ File destinationFile = new File(tempFolder.getRoot(), "json-outputs");
+ BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+ env.executeDdl(
+ String.format(
+ "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s'
TBLPROPERTIES '{\"format\":\"json\"}'",
+ SQL_JSON_SCHEMA, destinationFile.getAbsolutePath()));
+
+ BeamSqlRelUtils.toPCollection(
+ pipeline, env.parseQuery("INSERT INTO test(name, age) VALUES ('Jack',
13)"));
+ pipeline.run();
+
+ assertThat(
+ new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
+ .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+ containsInAnyOrder(JSON_TEXT));
+ }
}