[ 
https://issues.apache.org/jira/browse/BEAM-13945?focusedWorklogId=756018&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-756018
 ]

ASF GitHub Bot logged work on BEAM-13945:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 12/Apr/22 20:31
            Start Date: 12/Apr/22 20:31
    Worklog Time Spent: 10m 
      Work Description: chamikaramj commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r848844479


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),
+              new TableFieldSchema().setName("country").setType("JSON")
+          ));
+
+  public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+      + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+  private static final Map<String, String> JSON_TYPE_DATA = 
generateCountryData(false);
+
+  // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+  static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String, 
String>> {
+    @ProcessElement
+    public void processElement(@Element TableRow row, 
OutputReceiver<KV<String, String>> out){
+      String country_code = row.get("country_code").toString();
+      String country = row.get("country").toString();
+
+      out.output(KV.of(country_code, country));
+    }
+  }
+
+  // Compare PCollection input with expected results.
+  static class CompareJSON implements SerializableFunction<Iterable<KV<String, 
String>>, Void> {
+    Map<String, String> expected;
+    public CompareJSON(Map<String, String> expected){
+      this.expected = expected;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) throws 
RuntimeException {
+      int counter = 0;
+
+      // Iterate through input list and convert each String to JsonElement
+      // Compare with expected result JsonElements
+      for(KV<String, String> actual: input){
+        String key = actual.getKey();
+
+        if(!expected.containsKey(key)){
+          throw new NoSuchElementException(String.format(
+              "Unexpected key '%s' found in input but does not exist in 
expected results.", key));
+        }
+        String jsonStringActual = actual.getValue();
+        JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+        String jsonStringExpected = expected.get(key);
+        JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+        assertEquals(jsonExpected, jsonActual);
+        counter += 1;
+      }
+      if(counter != expected.size()){
+        throw new RuntimeException(String.format(
+            "Expected %d elements but got %d elements.", expected.size(), 
counter));
+      }
+      return null;
+    }
+  }
+
+  public void runTestWrite(BigQueryIOJSONOptions options){
+    List<TableRow> rowsToWrite = new ArrayList<>();
+    for(Map.Entry<String, String> element: JSON_TYPE_DATA.entrySet()){
+      rowsToWrite.add(new TableRow()
+          .set("country_code", element.getKey())
+          .set("country", element.getValue()));
+    }
+
+    p_write
+        .apply("Create Elements", Create.of(rowsToWrite))
+        .apply("Write To BigQuery",
+            BigQueryIO.writeTableRows()
+                .to(options.getOutput())
+                .withSchema(JSON_TYPE_TABLE_SCHEMA)
+                .withCreateDisposition(options.getCreateDisposition())
+                .withMethod(options.getWriteMethod()));
+    p_write.run().waitUntilFinish();
+
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  // reads TableRows from BigQuery and validates JSON Strings
+  // expectedJsonResults Strings must be in valid json format
+  public void readAndValidateRows(BigQueryIOJSONOptions options, Map<String, 
String> expectedResults){
+    TypedRead<TableRow> bigqueryIO =
+        BigQueryIO.readTableRows().withMethod(options.getReadMethod());
+
+    // read from input query or from table
+    if(!options.getQuery().isEmpty()) {
+      bigqueryIO = bigqueryIO.fromQuery(options.getQuery()).usingStandardSql();
+    } else {
+      bigqueryIO = bigqueryIO.from(options.getInput());
+    }
+
+    PCollection<KV<String, String>> jsonKVPairs = p
+        .apply("Read rows", bigqueryIO)
+        .apply("Convert to KV JSON Strings", ParDo.of(new 
TableRowToJSONStringFn()));
+
+    PAssert.that(jsonKVPairs).satisfies(new CompareJSON(expectedResults));
+
+    p.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testDirectRead() throws Exception {
+    LOG.info("Testing DIRECT_READ read method with JSON data");
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testExportRead() throws Exception {
+    LOG.info("Testing EXPORT read method with JSON data");
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.EXPORT);
+    options.setInput(JSON_TABLE_DESTINATION);
+
+    readAndValidateRows(options, JSON_TYPE_DATA);
+  }
+
+  @Test
+  public void testQueryRead() throws Exception {
+    LOG.info("Testing querying JSON data with DIRECT_READ read method");
+
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setReadMethod(TypedRead.Method.DIRECT_READ);
+    options.setQuery(
+        String.format("SELECT country_code, country.cities AS country FROM "
+            + "`%s.%s.%s`", project, DATASET_ID, JSON_TYPE_TABLE_NAME));
+
+    // get nested json objects from static data
+    Map<String, String> expected = generateCountryData(true);
+
+    readAndValidateRows(options, expected);
+  }
+
+  @Test
+  public void testStorageWrite() throws Exception{
+    LOG.info("Testing writing JSON data with Storage API");
+
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOJSONOptions.class);
+    options.setWriteMethod(Write.Method.STORAGE_WRITE_API);
+
+    String storage_destination = String.format("%s:%s.%s", project, 
DATASET_ID, STORAGE_WRITE_TEST_TABLE);
+    options.setOutput(storage_destination);
+    options.setInput(storage_destination);
+
+    runTestWrite(options);
+  }
+
+  @Test
+  public void testLegacyStreamingWrite() throws Exception{

Review Comment:
   Should we add this to Java docs ?





Issue Time Tracking
-------------------

    Worklog Id:     (was: 756018)
    Time Spent: 3h 50m  (was: 3h 40m)

> Update BQ connector to support new JSON type
> --------------------------------------------
>
>                 Key: BEAM-13945
>                 URL: https://issues.apache.org/jira/browse/BEAM-13945
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Ahmed Abualsaud
>            Priority: P2
>          Time Spent: 3h 50m
>  Remaining Estimate: 0h
>
> BQ has a new JSON type that is defined here: 
> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
> We should update Beam BQ Java and Python connectors to support that for 
> various read methods (export jobs, storage API) and write methods (load jobs, 
> streaming inserts, storage API).
> We should also add integration tests that exercise reading from /writing to 
> BQ tables with columns that has JSON type.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to