[
https://issues.apache.org/jira/browse/BEAM-13945?focusedWorklogId=754087&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-754087
]
ASF GitHub Bot logged work on BEAM-13945:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 07/Apr/22 13:50
Start Date: 07/Apr/22 13:50
Worklog Time Spent: 10m
Work Description: ahmedabu98 commented on code in PR #17209:
URL: https://github.com/apache/beam/pull/17209#discussion_r845160161
##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOJSONIT.java:
##########
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import com.google.common.collect.ImmutableList;
+import com.google.gson.JsonElement;
+import com.google.gson.JsonParser;
+import java.security.SecureRandom;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.json.JSONArray;
+import org.json.JSONObject;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+@RunWith(JUnit4.class)
+public class BigQueryIOJSONIT {
+ private static final Logger LOG =
LoggerFactory.getLogger(BigQueryIOJSONIT.class);
+
+ @Rule
+ public final transient TestPipeline p = TestPipeline.create();
+
+ @Rule
+ public transient TestPipeline p_write = TestPipeline.create();
+
+ private BigQueryIOJSONOptions options;
+
+ private static String project;
+
+ private static final String DATASET_ID = "bq_jsontype_test_nodelete";
+
+ private static final String JSON_TYPE_TABLE_NAME = "json_data";
+
+ private static String JSON_TABLE_DESTINATION;
+
+ private static final TableSchema JSON_TYPE_TABLE_SCHEMA =
+ new TableSchema()
+ .setFields(ImmutableList.of(
+ new TableFieldSchema().setName("country_code").setType("STRING"),
+ new TableFieldSchema().setName("country").setType("JSON")
+ ));
+
+ public static final String STORAGE_WRITE_TEST_TABLE = "storagewrite_test"
+ + System.currentTimeMillis() + "_" + new SecureRandom().nextInt(32);
+
+ private static final Map<String, String> JSON_TYPE_DATA =
generateCountryData(false);
+
+ // Convert PCollection of TableRows to a PCollection of KV JSON string pairs
+ static class TableRowToJSONStringFn extends DoFn<TableRow, KV<String,
String>> {
+ @ProcessElement
+ public void processElement(@Element TableRow row,
OutputReceiver<KV<String, String>> out){
+ String country_code = row.get("country_code").toString();
+ String country = row.get("country").toString();
+
+ out.output(KV.of(country_code, country));
+ }
+ }
+
+ // Compare PCollection input with expected results.
+ static class CompareJSON implements SerializableFunction<Iterable<KV<String,
String>>, Void> {
+ Map<String, String> expected;
+ public CompareJSON(Map<String, String> expected){
+ this.expected = expected;
+ }
+
+ @Override
+ public Void apply(Iterable<KV<String, String>> input) throws
RuntimeException {
+ int counter = 0;
+
+ // Iterate through input list and convert each String to JsonElement
+ // Compare with expected result JsonElements
+ for(KV<String, String> actual: input){
+ String key = actual.getKey();
+
+ if(!expected.containsKey(key)){
+ throw new NoSuchElementException(String.format(
+ "Unexpected key '%s' found in input but does not exist in
expected results.", key));
+ }
+ String jsonStringActual = actual.getValue();
+ JsonElement jsonActual = JsonParser.parseString(jsonStringActual);
+
+ String jsonStringExpected = expected.get(key);
+ JsonElement jsonExpected = JsonParser.parseString(jsonStringExpected);
+
+ assertEquals(jsonExpected, jsonActual);
+ counter += 1;
+ }
+ if(counter != expected.size()){
+ throw new RuntimeException(String.format(
+ "Expected %d elements but got %d elements.", expected.size(),
counter));
+ }
+ return null;
Review Comment:
IIUC, SerializableFunction requires an output type. And the compare method
needs to implement SerializableFunction for Passert.that().satisfies() to
accept it.
Issue Time Tracking
-------------------
Worklog Id: (was: 754087)
Time Spent: 1h 40m (was: 1.5h)
> Update BQ connector to support new JSON type
> --------------------------------------------
>
> Key: BEAM-13945
> URL: https://issues.apache.org/jira/browse/BEAM-13945
> Project: Beam
> Issue Type: New Feature
> Components: io-java-gcp
> Reporter: Chamikara Madhusanka Jayalath
> Assignee: Ahmed Abualsaud
> Priority: P2
> Time Spent: 1h 40m
> Remaining Estimate: 0h
>
> BQ has a new JSON type that is defined here:
> https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#json_type
> We should update Beam BQ Java and Python connectors to support that for
> various read methods (export jobs, storage API) and write methods (load jobs,
> streaming inserts, storage API).
> We should also add integration tests that exercise reading from /writing to
> BQ tables with columns that has JSON type.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)