This is an automated email from the ASF dual-hosted git repository.

amaliujia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e5e780  [BEAM-8406] Add support for JSON format text tables
     new 13858b4  Merge pull request #10217 from milantracy/beam8406
1e5e780 is described below

commit 1e5e780a0107c88704ff2865b60d33d0f5cfeb1a
Author: Jing Chen <[email protected]>
AuthorDate: Tue Nov 26 00:45:49 2019 -0800

    [BEAM-8406] Add support for JSON format text tables
---
 .../sql/meta/provider/text/TextJsonTable.java      | 42 ++++++++++
 .../sql/meta/provider/text/TextTableProvider.java  | 98 +++++++++++++++++++++-
 .../meta/provider/text/TextTableProviderTest.java  | 68 +++++++++++++++
 3 files changed, 206 insertions(+), 2 deletions(-)

diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
new file mode 100644
index 0000000..a59c337
--- /dev/null
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextJsonTable.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.text;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider.JsonToRow;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.text.TextTableProvider.RowToJson;
+import org.apache.beam.sdk.schemas.Schema;
+
+/**
+ * {@link TextJsonTable} is a {@link BeamSqlTable} that reads text files and 
converts them according
+ * to the JSON format.
+ *
+ * <p>Support format is {@code "json"}.
+ *
+ * <p>Check {@link ObjectMapper} javadoc for more info on reading and writing 
JSON.
+ */
+@Internal
+public class TextJsonTable extends TextTable {
+
+  public TextJsonTable(
+      Schema schema, String filePattern, JsonToRow readConverter, RowToJson 
writerConverter) {
+    super(schema, filePattern, readConverter, writerConverter);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
index 229e7cc..e262bfe 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProvider.java
@@ -19,23 +19,38 @@ package 
org.apache.beam.sdk.extensions.sql.meta.provider.text;
 
 import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.beamRow2CsvLine;
 import static 
org.apache.beam.sdk.extensions.sql.impl.schema.BeamTableUtils.csvLines2BeamRows;
+import static org.apache.beam.sdk.util.RowJsonUtils.jsonToRow;
+import static org.apache.beam.sdk.util.RowJsonUtils.newObjectMapperWith;
 import static 
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.Preconditions.checkArgument;
 
 import com.alibaba.fastjson.JSONObject;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.auto.service.AutoService;
+import com.google.auto.value.AutoValue;
 import java.io.Serializable;
 import javax.annotation.Nullable;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
 import org.apache.beam.sdk.extensions.sql.meta.Table;
 import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.TypeName;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.FlatMapElements;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.ToJson;
+import org.apache.beam.sdk.util.RowJson.RowJsonDeserializer;
+import 
org.apache.beam.sdk.util.RowJson.RowJsonDeserializer.UnsupportedRowJsonException;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
 import org.apache.beam.sdk.values.Row;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
 import org.apache.beam.sdk.values.TypeDescriptors;
 import 
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
 import 
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.base.MoreObjects;
@@ -73,11 +88,12 @@ public class TextTableProvider extends 
InMemoryMetaTableProvider {
     String filePattern = table.getLocation();
     JSONObject properties = table.getProperties();
     String format = MoreObjects.firstNonNull(properties.getString("format"), 
"csv");
+    String deadLetterFile = properties.getString("deadLetterFile");
 
     // Backwards compatibility: previously "type": "text" meant CSV and 
"format" was where the
     // CSV format went. So assume that any other format is the CSV format.
     @Nullable String legacyCsvFormat = null;
-    if (!ImmutableSet.of("csv", "lines").contains(format)) {
+    if (!ImmutableSet.of("csv", "lines", "json").contains(format)) {
       legacyCsvFormat = format;
       format = "csv";
     }
@@ -93,6 +109,9 @@ public class TextTableProvider extends 
InMemoryMetaTableProvider {
                     : CSVFormat.DEFAULT);
         return new TextTable(
             schema, filePattern, new CsvToRow(schema, csvFormat), new 
RowToCsv(csvFormat));
+      case "json":
+        return new TextJsonTable(
+            schema, filePattern, JsonToRow.create(schema, deadLetterFile), 
RowToJson.create());
       case "lines":
         checkArgument(
             schema.getFieldCount() == 1
@@ -103,7 +122,7 @@ public class TextTableProvider extends 
InMemoryMetaTableProvider {
             schema, filePattern, new LinesReadConverter(), new 
LinesWriteConverter());
       default:
         throw new IllegalArgumentException(
-            "Table with type 'text' must have format 'csv' or 'lines'");
+            "Table with type 'text' must have format 'csv' or 'lines' or 
'json'");
     }
   }
 
@@ -141,6 +160,81 @@ public class TextTableProvider extends 
InMemoryMetaTableProvider {
     }
   }
 
+  /** Read-side converter for {@link TextJsonTable} with format {@code 
'json'}. */
+  @AutoValue
+  @Internal
+  abstract static class JsonToRow extends PTransform<PCollection<String>, 
PCollection<Row>>
+      implements Serializable {
+    protected static final TupleTag<String> DLF_TAG = new TupleTag<>();
+    protected static final TupleTag<Row> MAIN_TAG = new TupleTag<>();
+
+    public abstract Schema schema();
+
+    @Nullable
+    public abstract String deadLetterFile();
+
+    public static JsonToRow create(Schema schema, @Nullable String 
deadLetterFile) {
+      return new AutoValue_TextTableProvider_JsonToRow(schema, deadLetterFile);
+    }
+
+    public static JsonToRow create(Schema schema) {
+      return create(schema, null);
+    }
+
+    @Override
+    public PCollection<Row> expand(PCollection<String> input) {
+      PCollectionTuple rows =
+          input.apply(
+              ParDo.of(
+                      new DoFn<String, Row>() {
+                        @ProcessElement
+                        public void processElement(ProcessContext context) {
+                          try {
+                            context.output(jsonToRow(getObjectMapper(), 
context.element()));
+                          } catch (UnsupportedRowJsonException jsonException) {
+                            if (deadLetterFile() != null) {
+                              context.output(DLF_TAG, context.element());
+                            } else {
+                              throw new RuntimeException("Error parsing JSON", 
jsonException);
+                            }
+                          }
+                        }
+                      })
+                  .withOutputTags(
+                      MAIN_TAG,
+                      deadLetterFile() != null ? TupleTagList.of(DLF_TAG) : 
TupleTagList.empty()));
+
+      if (deadLetterFile() != null) {
+        
rows.get(DLF_TAG).setCoder(StringUtf8Coder.of()).apply(writeJsonToDlf());
+      }
+      return rows.get(MAIN_TAG).setRowSchema(schema());
+    }
+
+    private TextIO.Write writeJsonToDlf() {
+      return TextIO.write().withDelimiter(new char[] {}).to(deadLetterFile());
+    }
+
+    private ObjectMapper getObjectMapper() {
+      return newObjectMapperWith(RowJsonDeserializer.forSchema(schema()));
+    }
+  }
+
+  /** Write-side converter for {@link TextJsonTable} with format {@code 
'json'}. */
+  @AutoValue
+  @Internal
+  abstract static class RowToJson extends PTransform<PCollection<Row>, 
PCollection<String>>
+      implements Serializable {
+
+    public static RowToJson create() {
+      return new AutoValue_TextTableProvider_RowToJson();
+    }
+
+    @Override
+    public PCollection<String> expand(PCollection<Row> input) {
+      return input.apply(ToJson.of());
+    }
+  }
+
   /** Write-side converter for {@link TextTable} with format {@code 'csv'}. */
   @VisibleForTesting
   static class RowToCsv extends PTransform<PCollection<Row>, 
PCollection<String>>
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
index 172bdb1..1c1b6ba 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/text/TextTableProviderTest.java
@@ -61,6 +61,12 @@ public class TextTableProviderTest {
   private static final Schema LINES_SCHEMA = 
Schema.builder().addStringField("f_string").build();
   private static final String SQL_LINES_SCHEMA = "(f_string VARCHAR)";
 
+  private static final Schema JSON_SCHEMA =
+      Schema.builder().addStringField("name").addInt32Field("age").build();
+  private static final String SQL_JSON_SCHEMA = "(name VARCHAR, age INTEGER)";
+  private static final String JSON_TEXT = "{\"name\":\"Jack\",\"age\":13}";
+  private static final String INVALID_JSON_TEXT = 
"{\"name\":\"Jack\",\"age\":\"thirteen\"}";
+
   // Even though these have the same schema as LINES_SCHEMA, that is 
accidental; they exist for a
   // different purpose, to test Excel CSV format that does not ignore empty 
lines
   private static final Schema SINGLE_STRING_CSV_SCHEMA =
@@ -210,6 +216,49 @@ public class TextTableProviderTest {
   }
 
   @Test
+  public void testJson() throws Exception {
+    Files.write(tempFolder.newFile("test.json").toPath(), 
JSON_TEXT.getBytes(Charsets.UTF_8));
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' 
TBLPROPERTIES '{\"format\":\"json\"}'",
+            SQL_JSON_SCHEMA, tempFolder.getRoot()));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows)
+        .containsInAnyOrder(Row.withSchema(JSON_SCHEMA).addValues("Jack", 
13).build());
+    pipeline.run();
+  }
+
+  @Test
+  public void testInvalidJson() throws Exception {
+    File deadLetterFile = new File(tempFolder.getRoot(), "dead-letter-file");
+    Files.write(
+        tempFolder.newFile("test.json").toPath(), 
INVALID_JSON_TEXT.getBytes(Charsets.UTF_8));
+
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s/*' "
+                + "TBLPROPERTIES '{\"format\":\"json\", \"deadLetterFile\": 
\"%s\"}'",
+            SQL_JSON_SCHEMA, tempFolder.getRoot(), 
deadLetterFile.getAbsoluteFile()));
+
+    PCollection<Row> rows =
+        BeamSqlRelUtils.toPCollection(pipeline, env.parseQuery("SELECT * FROM 
test"));
+
+    PAssert.that(rows).empty();
+
+    pipeline.run();
+    assertThat(
+        new NumberedShardedFile(deadLetterFile.getAbsoluteFile() + "*")
+            .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+        containsInAnyOrder(INVALID_JSON_TEXT));
+  }
+
+  @Test
   public void testWriteLines() throws Exception {
     File destinationFile = new File(tempFolder.getRoot(), "lines-outputs");
     BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
@@ -248,4 +297,23 @@ public class TextTableProviderTest {
             .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
         containsInAnyOrder("hello,42", "goodbye,13"));
   }
+
+  @Test
+  public void testWriteJson() throws Exception {
+    File destinationFile = new File(tempFolder.getRoot(), "json-outputs");
+    BeamSqlEnv env = BeamSqlEnv.inMemory(new TextTableProvider());
+    env.executeDdl(
+        String.format(
+            "CREATE EXTERNAL TABLE test %s TYPE text LOCATION '%s' 
TBLPROPERTIES '{\"format\":\"json\"}'",
+            SQL_JSON_SCHEMA, destinationFile.getAbsolutePath()));
+
+    BeamSqlRelUtils.toPCollection(
+        pipeline, env.parseQuery("INSERT INTO test(name, age) VALUES ('Jack', 
13)"));
+    pipeline.run();
+
+    assertThat(
+        new NumberedShardedFile(destinationFile.getAbsolutePath() + "*")
+            .readFilesWithRetries(Sleeper.DEFAULT, BackOff.STOP_BACKOFF),
+        containsInAnyOrder(JSON_TEXT));
+  }
 }

Reply via email to