TheNeuralBit commented on a change in pull request #12341:
URL: https://github.com/apache/beam/pull/12341#discussion_r461252200



##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new 
DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, 
generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue("field_name").build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, 
generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals("field_name", dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty_nullValue_throwsException() {
+    final String location = "projectId/batch_kind";
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue("").build();
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            (new DataStoreV1SchemaIOProvider())
+                .from(location, configuration, generateDataSchema()));
+  }
+
+  private static Schema generateDataSchema() {
+    return Schema.builder()
+        .addNullableField("id", Schema.FieldType.INT32)
+        .addNullableField("name", Schema.FieldType.STRING)
+        .build();
+  }
+
+  private Schema generateRowSchema() {
+    return Schema.builder().addNullableField(KEY_FIELD_PROPERTY, 
Schema.FieldType.STRING).build();
+  }

Review comment:
       I think it would be better to get rid of this function and call 
`provider.configurationSchema()` instead.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
##########
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Row>} to 
{@code
+ * PCollection<Entity>}.

Review comment:
       ```suggestion
    * A {@code PTransform} to perform a conversion of {@link Row} to {@link 
Entity}.
   ```

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new 
DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, 
generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue("field_name").build();

Review comment:
       I think we're now preferring to use withFieldValue rather than addValue 
when constructing Row instances now, e.g.:
   `Row.withSchema(provider.configurationSchema()).withFieldValue("keyField", 
"field_name")`
   
   See the examples in Row.java: 
https://github.com/apache/beam/blob/956e4eb39a7fedbae05985c759284557dcc3d9ec/sdks/java/core/src/main/java/org/apache/beam/sdk/values/Row.java#L64-L74
   

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
##########
@@ -74,7 +73,7 @@
           .addNullableField("rowArray", 
array(FieldType.row(NESTED_ROW_SCHEMA)))
           .addNullableField("double", DOUBLE)
           .addNullableField("bytes", BYTES)
-          .addNullableField("string", CalciteUtils.CHAR)
+          .addNullableField("string", CHAR)

Review comment:
       Will this work if we just use `STRING`? (Rather than re-creating 
`CalciteUtils.CHAR` here)

##########
File path: 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new 
DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+    Row configuration = 
Row.withSchema(generateRowSchema()).addValue(null).build();
+    SchemaIO schemaIO =
+        (new DataStoreV1SchemaIOProvider()).from(location, configuration, 
generateDataSchema());

Review comment:
       I think you could just re-use `provider` here and in the other tests. If 
you're worried about re-using the same instance in all the tests you could 
consider initializing `provider` in an `@Before` method instead of initializing 
statically so that each test will get a fresh instance.

##########
File path: 
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
##########
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@code PTransform} to perform a conversion of {@code PCollection<Entity>} 
to {@code
+ * PCollection<Row>}.

Review comment:
       ```suggestion
    * A {@code PTransform} to perform a conversion of {@link Entity} to {@link 
Row}.
   ```

##########
File path: 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
##########
@@ -39,15 +43,14 @@
  * }</pre>
  */
 @AutoService(TableProvider.class)
-public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
-
+public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper {
   @Override
-  public String getTableType() {
-    return "datastoreV1";
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new DataStoreV1SchemaIOProvider();
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new DataStoreV1Table(table);
+  public String getTableType() {
+    return "datastoreV1";

Review comment:
       Should this also have an implementation for getTableStatistics? It looks 
like DataStoreV1Table had a non-standard implementation:
   ```
     @Override  
     public BeamTableStatistics getTableStatistics(PipelineOptions options) {   
       long count =     
           
DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, 
null);        
   
       if (count < 0) { 
         return BeamTableStatistics.BOUNDED_UNKNOWN;    
       }        
   
       return BeamTableStatistics.createBoundedTableStatistics((double) count); 
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to