[ 
https://issues.apache.org/jira/browse/BEAM-13945?focusedWorklogId=754087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754087
 ]

ASF GitHub Bot logged work on BEAM-13945:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Apr/22 13:50
            Start Date: 07/Apr/22 13:50
    Worklog Time Spent: 10m 
      Work Description: ahmedabu98 commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r845160161


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+  @Rule
+  public final transient TestPipeline p = TestPipeline.create();
+
+  @Rule
+  public transient TestPipeline p_write = TestPipeline.create();
+
+  private BigQueryIOJSONOptions options;
+
+  private static String project;
+
+  private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+  private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+  private static String JSON_TABLE_DESTINATION;
+
+  private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+      new TableSchema()
+          .setFields(ImmutableList.of(
+              new TableFieldSchema().setName("country_code").setType("STRING"),
+              new TableFieldSchema().setName("country").setType("JSON")
+          ));
+
+  public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+      + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+  private static final Map<String, String> JSON_TYPE_DATA = 
generateCountryData(false);
+
+  // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+  static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String, 
String>> {
+    @ProcessElement
+    public void processElement(@Element TableRow row, 
OutputReceiver<KV<String, String>> out){
+      String country_code = row.get("country_code").toString();
+      String country = row.get("country").toString();
+
+      out.output(KV.of(country_code, country));
+    }
+  }
+
+  // Compare PCollection input with expected results.
+  static class CompareJSON implements SerializableFunction<Iterable<KV<String, 
String>>, Void> {
+    Map<String, String> expected;
+    public CompareJSON(Map<String, String> expected){
+      this.expected = expected;
+    }
+
+    @Override
+    public Void apply(Iterable<KV<String, String>> input) throws 
RuntimeException {
+      int counter = 0;
+
+      // Iterate through input list and convert each String to JsonElement
+      // Compare with expected result JsonElements
+      for(KV<String, String> actual: input){
+        String key = actual.getKey();
+
+        if(!expected.containsKey(key)){
+          throw new NoSuchElementException(String.format(
+              "Unexpected key '%s' found in input but does not exist in 
expected results.", key));
+        }
+        String jsonStringActual = actual.getValue();
+        JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+        String jsonStringExpected = expected.get(key);
+        JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+        assertEquals(jsonExpected, jsonActual);
+        counter += 1;
+      }
+      if(counter != expected.size()){
+        throw new RuntimeException(String.format(
+            "Expected %d elements but got %d elements.", expected.size(), 
counter));
+      }
+      return null;

Review Comment:
   IIUC, SerializableFunction requires an output type. And the compare method 
needs to implement SerializableFunction for Passert.that().satisfies() to 
accept it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 754087)
    Time Spent: 1h 40m  (was: 1.5h)

> Update BQ connector to support new JSON type
> --------------------------------------------
>
>                 Key: BEAM-13945
>                 URL: https://issues.apache.org/jira/browse/BEAM-13945
>             Project: Beam
>          Issue Type: New Feature
>          Components: io-java-gcp
>            Reporter: Chamikara Madhusanka Jayalath
>            Assignee: Ahmed Abualsaud
>            Priority: P2
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> BQ has a new JSON type that is defined here: 
> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
> We should update Beam BQ Java and Python connectors to support that for 
> various read methods (export jobs, storage API) and write methods (load jobs, 
> streaming inserts, storage API).
> We should also add integration tests that exercise reading from /writing to 
> BQ tables with columns that has JSON type.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to