This is an automated email from the ASF dual-hosted git repository.

bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new ab12f12  [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, 
Refactored tests (#12341)
ab12f12 is described below

commit ab12f121fd0416defc3752980d2c2daa8e91fb0c
Author: sclukas77 <[email protected]>
AuthorDate: Tue Aug 18 17:32:26 2020 +0000

    [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests 
(#12341)
    
    * Implemented SchemaIOProvider for DataStoreV1, refactored tests
    
    * Modified SchemaIOTableProviderWrapper#getTableStatistics
    
    * Update 
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
    
    Co-authored-by: Scott Lukas <[email protected]>
    Co-authored-by: Brian Hulette <[email protected]>
---
 .../sdk/schemas/io/InvalidLocationException.java   |  39 ++
 .../provider/SchemaIOTableProviderWrapper.java     |   4 +-
 .../meta/provider/datastore/DataStoreV1Table.java  | 435 ---------------------
 .../datastore/DataStoreV1TableProvider.java        |  37 +-
 .../provider/datastore/DataStoreReadWriteIT.java   |   4 +-
 .../datastore/DataStoreTableProviderTest.java      | 112 ------
 .../gcp/datastore/DataStoreV1SchemaIOProvider.java | 179 +++++++++
 .../beam/sdk/io/gcp/datastore/EntityToRow.java     | 161 ++++++++
 .../beam/sdk/io/gcp/datastore/RowToEntity.java     | 202 ++++++++++
 .../datastore/DataStoreV1SchemaIOProviderTest.java | 104 +++++
 .../gcp/datastore/EntityToRowRowToEntityTest.java} |  23 +-
 11 files changed, 733 insertions(+), 567 deletions(-)

diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
new file mode 100644
index 0000000..4cfb5e9
--- /dev/null
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+
+/** Exception thrown when the configuration for a {@link SchemaIO} is invalid. 
*/
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class InvalidLocationException extends IllegalArgumentException {
+  public InvalidLocationException(String msg) {
+    super(msg);
+  }
+
+  public InvalidLocationException(String msg, Throwable cause) {
+    super(msg, cause);
+  }
+
+  public InvalidLocationException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
index c5044a5..e8292fb 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
@@ -80,7 +80,7 @@ public abstract class SchemaIOTableProviderWrapper extends 
InMemoryMetaTableProv
     }
   }
 
-  private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+  protected BeamTableStatistics getTableStatistics(PipelineOptions options, 
SchemaIO schemaIO) {
     if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
       return BeamTableStatistics.BOUNDED_UNKNOWN;
     }
@@ -123,7 +123,7 @@ public abstract class SchemaIOTableProviderWrapper extends 
InMemoryMetaTableProv
 
     @Override
     public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-      return SchemaIOTableProviderWrapper.this.getTableStatistics(options);
+      return SchemaIOTableProviderWrapper.this.getTableStatistics(options, 
schemaIO);
     }
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
deleted file mode 100644
index 7efdb13..0000000
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
-
-import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.datastore.v1.Entity;
-import com.google.datastore.v1.Query;
-import com.google.datastore.v1.Value;
-import com.google.datastore.v1.Value.ValueTypeCase;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
-import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import 
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
-import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Internal
-@Experimental
-class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable {
-  public static final String KEY_FIELD_PROPERTY = "keyField";
-  @VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__";
-  private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreV1Table.class);
-  // Should match: `projectId/kind`.
-  private static final Pattern locationPattern = 
Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
-  @VisibleForTesting final String keyField;
-  @VisibleForTesting final String projectId;
-  @VisibleForTesting final String kind;
-
-  DataStoreV1Table(Table table) {
-    super(table.getSchema());
-
-    // TODO: allow users to specify a name of the field to store a key value 
via TableProperties.
-    JSONObject properties = table.getProperties();
-    if (properties.containsKey(KEY_FIELD_PROPERTY)) {
-      String field = properties.getString(KEY_FIELD_PROPERTY);
-      if (!(field != null && !field.isEmpty())) {
-        throw new InvalidTableException(
-            String.format("'%s' property cannot be null.", 
KEY_FIELD_PROPERTY));
-      }
-      keyField = field;
-    } else {
-      keyField = DEFAULT_KEY_FIELD;
-    }
-    // TODO: allow users to specify a namespace in a location string.
-    String location = table.getLocation();
-    if (location == null) {
-      throw new InvalidTableException("DataStoreV1 location must be set: " + 
table);
-    }
-    Matcher matcher = locationPattern.matcher(location);
-
-    if (!matcher.matches()) {
-      throw new InvalidTableException(
-          "DataStoreV1 location must be in the following format: 
'projectId/kind'"
-              + " but was:"
-              + location);
-    }
-
-    this.projectId = matcher.group("projectId");
-    this.kind = matcher.group("kind");
-  }
-
-  @Override
-  public PCollection<Row> buildIOReader(PBegin begin) {
-    Query.Builder q = Query.newBuilder();
-    q.addKindBuilder().setName(kind);
-    Query query = q.build();
-
-    DatastoreV1.Read readInstance =
-        DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
-
-    return begin
-        .apply("Read Datastore Entities", readInstance)
-        .apply("Convert Datastore Entities to Rows", 
EntityToRow.create(getSchema(), keyField));
-  }
-
-  @Override
-  public POutput buildIOWriter(PCollection<Row> input) {
-    return input
-        .apply("Convert Rows to Datastore Entities", 
RowToEntity.create(keyField, kind))
-        .apply("Write Datastore Entities", 
DatastoreIO.v1().write().withProjectId(projectId));
-  }
-
-  @Override
-  public IsBounded isBounded() {
-    return IsBounded.BOUNDED;
-  }
-
-  @Override
-  public BeamTableStatistics getTableStatistics(PipelineOptions options) {
-    long count =
-        
DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind, 
null);
-
-    if (count < 0) {
-      return BeamTableStatistics.BOUNDED_UNKNOWN;
-    }
-
-    return BeamTableStatistics.createBoundedTableStatistics((double) count);
-  }
-
-  /**
-   * A {@code PTransform} to perform a conversion of {@code 
PCollection<Entity>} to {@code
-   * PCollection<Row>}.
-   */
-  public static class EntityToRow extends PTransform<PCollection<Entity>, 
PCollection<Row>> {
-    private final Schema schema;
-    private final String keyField;
-
-    private EntityToRow(Schema schema, String keyField) {
-      this.schema = schema;
-      this.keyField = keyField;
-
-      if (schema.getFieldNames().contains(keyField)) {
-        if 
(!schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
-          throw new IllegalStateException(
-              "Field `"
-                  + keyField
-                  + "` should of type `VARBINARY`. Please change the type or 
specify a field to"
-                  + " store the KEY value.");
-        }
-        LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
-      }
-    }
-
-    /**
-     * Create a PTransform instance.
-     *
-     * @param schema {@code Schema} of the target row.
-     * @param keyField A name of the row field to store the {@code Key} in.
-     * @return {@code PTransform} instance for Entity to Row conversion.
-     */
-    public static EntityToRow create(Schema schema, String keyField) {
-      return new EntityToRow(schema, keyField);
-    }
-
-    @Override
-    public PCollection<Row> expand(PCollection<Entity> input) {
-      return input.apply(ParDo.of(new 
EntityToRowConverter())).setRowSchema(schema);
-    }
-
-    @VisibleForTesting
-    class EntityToRowConverter extends DoFn<Entity, Row> {
-
-      @DoFn.ProcessElement
-      public void processElement(ProcessContext context) {
-        Entity entity = context.element();
-        ImmutableMap.Builder<String, Value> mapBuilder = 
ImmutableMap.builder();
-        mapBuilder.put(keyField, makeValue(entity.getKey()).build());
-        mapBuilder.putAll(entity.getPropertiesMap());
-
-        context.output(extractRowFromProperties(schema, mapBuilder.build()));
-      }
-
-      /**
-       * Convert DataStore {@code Value} to Beam type.
-       *
-       * @param currentFieldType Beam {@code Schema.FieldType} to convert to 
(used for {@code Row}
-       *     and {@code Array}).
-       * @param val DataStore {@code Value}.
-       * @return resulting Beam type.
-       */
-      private Object convertValueToObject(FieldType currentFieldType, Value 
val) {
-        ValueTypeCase typeCase = val.getValueTypeCase();
-
-        switch (typeCase) {
-          case NULL_VALUE:
-          case VALUETYPE_NOT_SET:
-            return null;
-          case BOOLEAN_VALUE:
-            return val.getBooleanValue();
-          case INTEGER_VALUE:
-            return val.getIntegerValue();
-          case DOUBLE_VALUE:
-            return val.getDoubleValue();
-          case TIMESTAMP_VALUE:
-            com.google.protobuf.Timestamp time = val.getTimestampValue();
-            long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
-            return Instant.ofEpochMilli(millis).toDateTime();
-          case STRING_VALUE:
-            return val.getStringValue();
-          case KEY_VALUE:
-            return val.getKeyValue().toByteArray();
-          case BLOB_VALUE:
-            return val.getBlobValue().toByteArray();
-          case ENTITY_VALUE:
-            // Recursive mapping for row type.
-            Schema rowSchema = currentFieldType.getRowSchema();
-            assert rowSchema != null;
-            Entity entity = val.getEntityValue();
-            return extractRowFromProperties(rowSchema, 
entity.getPropertiesMap());
-          case ARRAY_VALUE:
-            // Recursive mapping for collection type.
-            FieldType elementType = 
currentFieldType.getCollectionElementType();
-            List<Value> valueList = val.getArrayValue().getValuesList();
-            return valueList.stream()
-                .map(v -> convertValueToObject(elementType, v))
-                .collect(Collectors.toList());
-          case GEO_POINT_VALUE:
-          default:
-            throw new IllegalStateException(
-                "No conversion exists from type: "
-                    + val.getValueTypeCase().name()
-                    + " to Beam type.");
-        }
-      }
-
-      /**
-       * Converts all properties of an {@code Entity} to Beam {@code Row}.
-       *
-       * @param schema Target row {@code Schema}.
-       * @param values A map of property names and values.
-       * @return resulting Beam {@code Row}.
-       */
-      private Row extractRowFromProperties(Schema schema, Map<String, Value> 
values) {
-        Row.Builder builder = Row.withSchema(schema);
-        // It is not a guarantee that the values will be in the same order as 
the schema.
-        // Maybe metadata:
-        // 
https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
-        // TODO: figure out in what order the elements are in (without relying 
on Beam schema).
-        for (Schema.Field field : schema.getFields()) {
-          Value val = values.get(field.getName());
-          builder.addValue(convertValueToObject(field.getType(), val));
-        }
-        return builder.build();
-      }
-    }
-  }
-
-  /**
-   * A {@code PTransform} to perform a conversion of {@code PCollection<Row>} 
to {@code
-   * PCollection<Entity>}.
-   */
-  public static class RowToEntity extends PTransform<PCollection<Row>, 
PCollection<Entity>> {
-    private final Supplier<String> keySupplier;
-    private final String kind;
-    private final String keyField;
-
-    private RowToEntity(Supplier<String> keySupplier, String kind, String 
keyField) {
-      this.keySupplier = keySupplier;
-      this.kind = kind;
-      this.keyField = keyField;
-    }
-
-    @Override
-    public PCollection<Entity> expand(PCollection<Row> input) {
-      boolean isFieldPresent = 
input.getSchema().getFieldNames().contains(keyField);
-      if (isFieldPresent) {
-        if 
(!input.getSchema().getField(keyField).getType().getTypeName().equals(TypeName.BYTES))
 {
-          throw new IllegalStateException(
-              "Field `"
-                  + keyField
-                  + "` should of type `VARBINARY`. Please change the type or 
specify a field to"
-                  + " write the KEY value from via TableProperties.");
-        }
-        LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
-      }
-      return input.apply(ParDo.of(new RowToEntityConverter(isFieldPresent)));
-    }
-
-    /**
-     * Create a PTransform instance.
-     *
-     * @param keyField Row field containing a serialized {@code Key}, must be 
set when using user
-     *     specified keys.
-     * @param kind DataStore `Kind` data will be written to (required when 
generating random {@code
-     *     Key}s).
-     * @return {@code PTransform} instance for Row to Entity conversion.
-     */
-    public static RowToEntity create(String keyField, String kind) {
-      return new RowToEntity(
-          (Supplier<String> & Serializable) () -> 
UUID.randomUUID().toString(), kind, keyField);
-    }
-
-    @VisibleForTesting
-    static RowToEntity createTest(String keyString, String keyField, String 
kind) {
-      return new RowToEntity((Supplier<String> & Serializable) () -> 
keyString, kind, keyField);
-    }
-
-    @VisibleForTesting
-    class RowToEntityConverter extends DoFn<Row, Entity> {
-      private final boolean useNonRandomKey;
-
-      RowToEntityConverter(boolean useNonRandomKey) {
-        super();
-        this.useNonRandomKey = useNonRandomKey;
-      }
-
-      @DoFn.ProcessElement
-      public void processElement(ProcessContext context) {
-        Row row = context.element();
-
-        Schema schemaWithoutKeyField =
-            Schema.builder()
-                .addFields(
-                    row.getSchema().getFields().stream()
-                        .filter(field -> !field.getName().equals(keyField))
-                        .collect(Collectors.toList()))
-                .build();
-        Entity.Builder entityBuilder = 
constructEntityFromRow(schemaWithoutKeyField, row);
-        entityBuilder.setKey(constructKeyFromRow(row));
-
-        context.output(entityBuilder.build());
-      }
-
-      /**
-       * Converts an entire {@code Row} to an appropriate DataStore {@code 
Entity.Builder}.
-       *
-       * @param row {@code Row} to convert.
-       * @return resulting {@code Entity.Builder}.
-       */
-      private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
-        Entity.Builder entityBuilder = Entity.newBuilder();
-        for (Schema.Field field : schema.getFields()) {
-          Value val = mapObjectToValue(row.getValue(field.getName()));
-          entityBuilder.putProperties(field.getName(), val);
-        }
-        return entityBuilder;
-      }
-
-      /**
-       * Create a random key for a {@code Row} without a keyField or use a 
user-specified key by
-       * parsing it from byte array when keyField is set.
-       *
-       * @param row {@code Row} to construct a key for.
-       * @return resulting {@code Key}.
-       */
-      private com.google.datastore.v1.Key constructKeyFromRow(Row row) {
-        if (!useNonRandomKey) {
-          // When key field is not present - use key supplier to generate a 
random one.
-          return makeKey(kind, keySupplier.get()).build();
-        }
-        byte[] keyBytes = row.getBytes(keyField);
-        try {
-          return com.google.datastore.v1.Key.parseFrom(keyBytes);
-        } catch (InvalidProtocolBufferException e) {
-          throw new IllegalStateException("Failed to parse DataStore key from 
bytes.");
-        }
-      }
-
-      /**
-       * Converts a {@code Row} value to an appropriate DataStore {@code 
Value} object.
-       *
-       * @param value {@code Row} value to convert.
-       * @throws IllegalStateException when no mapping function for object of 
given type exists.
-       * @return resulting {@code Value}.
-       */
-      private Value mapObjectToValue(Object value) {
-        if (value == null) {
-          return Value.newBuilder().build();
-        }
-
-        if (Boolean.class.equals(value.getClass())) {
-          return makeValue((Boolean) value).build();
-        } else if (Byte.class.equals(value.getClass())) {
-          return makeValue((Byte) value).build();
-        } else if (Long.class.equals(value.getClass())) {
-          return makeValue((Long) value).build();
-        } else if (Short.class.equals(value.getClass())) {
-          return makeValue((Short) value).build();
-        } else if (Integer.class.equals(value.getClass())) {
-          return makeValue((Integer) value).build();
-        } else if (Double.class.equals(value.getClass())) {
-          return makeValue((Double) value).build();
-        } else if (Float.class.equals(value.getClass())) {
-          return makeValue((Float) value).build();
-        } else if (String.class.equals(value.getClass())) {
-          return makeValue((String) value).build();
-        } else if (Instant.class.equals(value.getClass())) {
-          return makeValue(((Instant) value).toDate()).build();
-        } else if (byte[].class.equals(value.getClass())) {
-          return makeValue(ByteString.copyFrom((byte[]) value)).build();
-        } else if (value instanceof Row) {
-          // Recursive conversion to handle nested rows.
-          Row row = (Row) value;
-          return makeValue(constructEntityFromRow(row.getSchema(), 
row)).build();
-        } else if (value instanceof Collection) {
-          // Recursive to handle nested collections.
-          Collection<Object> collection = (Collection<Object>) value;
-          List<Value> arrayValues =
-              
collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
-          return makeValue(arrayValues).build();
-        }
-        throw new IllegalStateException(
-            "No conversion exists from type: " + value.getClass() + " to 
DataStove Value.");
-      }
-    }
-  }
-}
diff --git 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
index 9ecf668..b579e06 100644
--- 
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
+++ 
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
@@ -18,13 +18,21 @@
 package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
 
 import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import 
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
 import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider;
+import 
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
 
 /**
- * {@link TableProvider} for {@link DataStoreV1Table}.
+ * {@link TableProvider} for {@link DatastoreIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link DataStoreV1SchemaIOProvider} to the generalized table 
provider wrapper,
+ * {@link SchemaIOTableProviderWrapper}, for DataStoreV1 specific behavior.
  *
  * <p>A sample of DataStoreV1Table table is:
  *
@@ -39,7 +47,11 @@ import 
org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
  * }</pre>
  */
 @AutoService(TableProvider.class)
-public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
+public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper {
+  @Override
+  public SchemaIOProvider getSchemaIOProvider() {
+    return new DataStoreV1SchemaIOProvider();
+  }
 
   @Override
   public String getTableType() {
@@ -47,7 +59,18 @@ public class DataStoreV1TableProvider extends 
InMemoryMetaTableProvider {
   }
 
   @Override
-  public BeamSqlTable buildBeamSqlTable(Table table) {
-    return new DataStoreV1Table(table);
+  public BeamTableStatistics getTableStatistics(PipelineOptions options, 
SchemaIO schemaIO) {
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    long count =
+        DatastoreIO.v1()
+            .read()
+            .withProjectId(dataStoreV1SchemaIO.getProjectId())
+            .getNumEntities(options, dataStoreV1SchemaIO.getKind(), null);
+
+    if (count < 0) {
+      return BeamTableStatistics.BOUNDED_UNKNOWN;
+    }
+
+    return BeamTableStatistics.createBoundedTableStatistics((double) count);
   }
 }
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
index 3274846..c2c499a 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
+++ 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
@@ -38,11 +38,11 @@ import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.PipelineResult.State;
 import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
 import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
 import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
 import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
+import org.apache.beam.sdk.io.gcp.datastore.EntityToRow;
+import org.apache.beam.sdk.io.gcp.datastore.RowToEntity;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
 
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
deleted file mode 100644
index 879add1..0000000
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
-
-import static 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
-import static 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.KEY_FIELD_PROPERTY;
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-
-import com.alibaba.fastjson.JSON;
-import java.util.stream.Stream;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.schemas.Schema;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class DataStoreTableProviderTest {
-  private DataStoreV1TableProvider provider = new DataStoreV1TableProvider();
-
-  @Test
-  public void testGetTableType() {
-    assertEquals("datastoreV1", provider.getTableType());
-  }
-
-  @Test
-  public void testBuildBeamSqlTable() {
-    final String location = "projectId/batch_kind";
-    Table table = fakeTable("TEST", location);
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-
-    assertNotNull(sqlTable);
-    assertTrue(sqlTable instanceof DataStoreV1Table);
-
-    DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
-    assertEquals("projectId", datastoreTable.projectId);
-    assertEquals("batch_kind", datastoreTable.kind);
-    assertEquals(DEFAULT_KEY_FIELD, datastoreTable.keyField);
-  }
-
-  @Test
-  public void testTableProperty() {
-    final String location = "projectId/batch_kind";
-    Table table =
-        fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY + 
": \"field_name\" }");
-    BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-
-    assertNotNull(sqlTable);
-    assertTrue(sqlTable instanceof DataStoreV1Table);
-
-    DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
-    assertEquals("projectId", datastoreTable.projectId);
-    assertEquals("batch_kind", datastoreTable.kind);
-    assertEquals("field_name", datastoreTable.keyField);
-  }
-
-  @Test
-  public void testTableProperty_nullValue_throwsException() {
-    final String location = "projectId/batch_kind";
-    Table table = fakeTableWithProperties("TEST", location, "{ " + 
KEY_FIELD_PROPERTY + ": \"\" }");
-    assertThrows(IllegalArgumentException.class, () -> 
provider.buildBeamSqlTable(table));
-  }
-
-  private static Table fakeTable(String name, String location) {
-    return Table.builder()
-        .name(name)
-        .comment(name + " table")
-        .location(location)
-        .schema(
-            Stream.of(
-                    Schema.Field.nullable("id", Schema.FieldType.INT32),
-                    Schema.Field.nullable("name", Schema.FieldType.STRING))
-                .collect(toSchema()))
-        .type("datastoreV1")
-        .build();
-  }
-
-  private static Table fakeTableWithProperties(String name, String location, 
String properties) {
-    return Table.builder()
-        .name(name)
-        .comment(name + " table")
-        .location(location)
-        .schema(
-            Stream.of(
-                    Schema.Field.nullable("id", Schema.FieldType.INT32),
-                    Schema.Field.nullable("name", Schema.FieldType.STRING))
-                .collect(toSchema()))
-        .type("datastoreV1")
-        .properties(JSON.parseObject(properties))
-        .build();
-  }
-}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
new file mode 100644
index 0000000..216ccf4
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.auto.service.AutoService;
+import com.google.datastore.v1.Query;
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidLocationException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing 
payloads with {@link
+ * DatastoreIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class DataStoreV1SchemaIOProvider implements SchemaIOProvider {
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  private static final Pattern locationPattern = 
Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
+
+  /** Returns an id that uniquely represents this IO. */
+  @Override
+  public String identifier() {
+    return "datastoreV1";
+  }
+
+  /**
+   * Returns the expected schema of the configuration object. Note this is 
distinct from the schema
+   * of the data source itself.
+   *
+   * <p>Configuration Parameters:
+   *
+   * <ul>
+   *   <li>STRING keyField: The name of the Beam schema field to map the 
DataStore entity key.
+   *       Defaults to {@code __key__} if not set or null.
+   * </ul>
+   */
+  @Override
+  public Schema configurationSchema() {
+    // TODO: allow users to specify a name of the field to store a key value 
via TableProperties.
+    return Schema.builder().addNullableField(KEY_FIELD_PROPERTY, 
Schema.FieldType.STRING).build();
+  }
+
+  /**
+   * Produce a SchemaIO given a String representing the data's location, the 
schema of the data that
+   * resides there, and some IO-specific configuration object.
+   */
+  @Override
+  public DataStoreV1SchemaIO from(String location, Row configuration, Schema 
dataSchema) {
+    return new DataStoreV1SchemaIO(location, configuration, dataSchema);
+  }
+
+  @Override
+  public boolean requiresDataSchema() {
+    return true;
+  }
+
+  @Override
+  public PCollection.IsBounded isBounded() {
+    return PCollection.IsBounded.BOUNDED;
+  }
+
+  /** An abstraction to create schema aware IOs. */
+  public static class DataStoreV1SchemaIO implements SchemaIO, Serializable {
+    protected final Schema dataSchema;
+    protected final String location;
+    protected final String kind;
+    protected final String projectId;
+    protected final String keyField;
+
+    private DataStoreV1SchemaIO(String location, Row config, Schema 
dataSchema) {
+      this.location = location;
+      this.dataSchema = dataSchema;
+      this.keyField = determineKeyField(config.getString(KEY_FIELD_PROPERTY));
+
+      Matcher matcher = locationPattern.matcher(this.location);
+      validateLocation(location, matcher);
+
+      this.kind = matcher.group("kind");
+      this.projectId = matcher.group("projectId");
+    }
+
+    @Override
+    public Schema schema() {
+      return dataSchema;
+    }
+
+    @Override
+    public PTransform<PBegin, PCollection<Row>> buildReader() {
+      return new PTransform<PBegin, PCollection<Row>>() {
+        @Override
+        public PCollection<Row> expand(PBegin begin) {
+          Query.Builder q = Query.newBuilder();
+          q.addKindBuilder().setName(kind);
+          Query query = q.build();
+
+          DatastoreV1.Read readInstance =
+              
DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
+
+          return begin
+              .apply("Read Datastore Entities", readInstance)
+              .apply(
+                  "Convert Datastore Entities to Rows", 
EntityToRow.create(dataSchema, keyField));
+        }
+      };
+    }
+
+    @Override
+    public PTransform<PCollection<Row>, POutput> buildWriter() {
+      return new PTransform<PCollection<Row>, POutput>() {
+        @Override
+        public POutput expand(PCollection<Row> input) {
+          return input
+              .apply("Convert Rows to Datastore Entities", 
RowToEntity.create(keyField, kind))
+              .apply("Write Datastore Entities", 
DatastoreIO.v1().write().withProjectId(projectId));
+        }
+      };
+    }
+
+    public String getProjectId() {
+      return projectId;
+    }
+
+    public String getKind() {
+      return kind;
+    }
+
+    private String determineKeyField(String configKey) {
+      if (configKey != null && configKey.isEmpty()) {
+        throw new InvalidConfigurationException(
+            String.format("'%s' property cannot be null.", 
KEY_FIELD_PROPERTY));
+      } else if (configKey != null) {
+        return configKey;
+      } else {
+        return DEFAULT_KEY_FIELD;
+      }
+    }
+
+    private void validateLocation(String location, Matcher matcher) {
+      // TODO: allow users to specify a namespace in a location string.
+      if (location == null) {
+        throw new InvalidLocationException("DataStoreV1 location must be set. 
");
+      }
+      if (!matcher.matches()) {
+        throw new InvalidLocationException(
+            "DataStoreV1 location must be in the following format: 
'projectId/kind'"
+                + " but was:"
+                + location);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
new file mode 100644
index 0000000..b86323c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Entity} to {@link 
Row}. */
+public class EntityToRow extends PTransform<PCollection<Entity>, 
PCollection<Row>> {
+  private final Schema schema;
+  private final String keyField;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+  private EntityToRow(Schema schema, String keyField) {
+    this.schema = schema;
+    this.keyField = keyField;
+
+    if (schema.getFieldNames().contains(keyField)) {
+      if 
(!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.BYTES))
 {
+        throw new IllegalStateException(
+            "Field `"
+                + keyField
+                + "` should of type `BYTES`. Please change the type or specify 
a field to"
+                + " store the KEY value.");
+      }
+      LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
+    }
+  }
+
+  /**
+   * Create a PTransform instance.
+   *
+   * @param schema {@code Schema} of the target row.
+   * @param keyField A name of the row field to store the {@code Key} in.
+   * @return {@code PTransform} instance for Entity to Row conversion.
+   */
+  public static EntityToRow create(Schema schema, String keyField) {
+    return new EntityToRow(schema, keyField);
+  }
+
+  @Override
+  public PCollection<Row> expand(PCollection<Entity> input) {
+    return input.apply(ParDo.of(new 
EntityToRow.EntityToRowConverter())).setRowSchema(schema);
+  }
+
+  class EntityToRowConverter extends DoFn<Entity, Row> {
+
+    @DoFn.ProcessElement
+    public void processElement(ProcessContext context) {
+      Entity entity = context.element();
+      ImmutableMap.Builder<String, Value> mapBuilder = ImmutableMap.builder();
+      mapBuilder.put(keyField, makeValue(entity.getKey()).build());
+      mapBuilder.putAll(entity.getPropertiesMap());
+
+      context.output(extractRowFromProperties(schema, mapBuilder.build()));
+    }
+
+    /**
+     * Convert DataStore {@code Value} to Beam type.
+     *
+     * @param currentFieldType Beam {@code Schema.FieldType} to convert to 
(used for {@code Row} and
+     *     {@code Array}).
+     * @param val DataStore {@code Value}.
+     * @return resulting Beam type.
+     */
+    private Object convertValueToObject(Schema.FieldType currentFieldType, 
Value val) {
+      Value.ValueTypeCase typeCase = val.getValueTypeCase();
+
+      switch (typeCase) {
+        case NULL_VALUE:
+        case VALUETYPE_NOT_SET:
+          return null;
+        case BOOLEAN_VALUE:
+          return val.getBooleanValue();
+        case INTEGER_VALUE:
+          return val.getIntegerValue();
+        case DOUBLE_VALUE:
+          return val.getDoubleValue();
+        case TIMESTAMP_VALUE:
+          com.google.protobuf.Timestamp time = val.getTimestampValue();
+          long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
+          return Instant.ofEpochMilli(millis).toDateTime();
+        case STRING_VALUE:
+          return val.getStringValue();
+        case KEY_VALUE:
+          return val.getKeyValue().toByteArray();
+        case BLOB_VALUE:
+          return val.getBlobValue().toByteArray();
+        case ENTITY_VALUE:
+          // Recursive mapping for row type.
+          Schema rowSchema = currentFieldType.getRowSchema();
+          assert rowSchema != null;
+          Entity entity = val.getEntityValue();
+          return extractRowFromProperties(rowSchema, 
entity.getPropertiesMap());
+        case ARRAY_VALUE:
+          // Recursive mapping for collection type.
+          Schema.FieldType elementType = 
currentFieldType.getCollectionElementType();
+          List<Value> valueList = val.getArrayValue().getValuesList();
+          return valueList.stream()
+              .map(v -> convertValueToObject(elementType, v))
+              .collect(Collectors.toList());
+        case GEO_POINT_VALUE:
+        default:
+          throw new IllegalStateException(
+              "No conversion exists from type: "
+                  + val.getValueTypeCase().name()
+                  + " to Beam type.");
+      }
+    }
+
+    /**
+     * Converts all properties of an {@code Entity} to Beam {@code Row}.
+     *
+     * @param schema Target row {@code Schema}.
+     * @param values A map of property names and values.
+     * @return resulting Beam {@code Row}.
+     */
+    private Row extractRowFromProperties(Schema schema, Map<String, Value> 
values) {
+      Row.Builder builder = Row.withSchema(schema);
+      // It is not a guarantee that the values will be in the same order as 
the schema.
+      // Maybe metadata:
+      // 
https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
+      // TODO: figure out in what order the elements are in (without relying 
on Beam schema).
+      for (Schema.Field field : schema.getFields()) {
+        Value val = values.get(field.getName());
+        builder.addValue(convertValueToObject(field.getType(), val));
+      }
+      return builder.build();
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
new file mode 100644
index 0000000..02ca67c
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Row} to {@link 
Entity}. */
+public class RowToEntity extends PTransform<PCollection<Row>, 
PCollection<Entity>> {
+  private final Supplier<String> keySupplier;
+  private final String kind;
+  private final String keyField;
+  private static final Logger LOG = 
LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+  private RowToEntity(Supplier<String> keySupplier, String kind, String 
keyField) {
+    this.keySupplier = keySupplier;
+    this.kind = kind;
+    this.keyField = keyField;
+  }
+
+  @Override
+  public PCollection<Entity> expand(PCollection<Row> input) {
+    boolean isFieldPresent = 
input.getSchema().getFieldNames().contains(keyField);
+    if (isFieldPresent) {
+      if (!input
+          .getSchema()
+          .getField(keyField)
+          .getType()
+          .getTypeName()
+          .equals(Schema.TypeName.BYTES)) {
+        throw new IllegalStateException(
+            "Field `"
+                + keyField
+                + "` should of type `VARBINARY`. Please change the type or 
specify a field to"
+                + " write the KEY value from via TableProperties.");
+      }
+      LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
+    }
+    return input.apply(ParDo.of(new 
RowToEntity.RowToEntityConverter(isFieldPresent)));
+  }
+
+  /**
+   * Create a PTransform instance.
+   *
+   * @param keyField Row field containing a serialized {@code Key}, must be 
set when using user
+   *     specified keys.
+   * @param kind DataStore `Kind` data will be written to (required when 
generating random {@code
+   *     Key}s).
+   * @return {@code PTransform} instance for Row to Entity conversion.
+   */
+  public static RowToEntity create(String keyField, String kind) {
+    return new RowToEntity(
+        (Supplier<String> & Serializable) () -> UUID.randomUUID().toString(), 
kind, keyField);
+  }
+
+  public static RowToEntity createTest(String keyString, String keyField, 
String kind) {
+    return new RowToEntity((Supplier<String> & Serializable) () -> keyString, 
kind, keyField);
+  }
+
+  class RowToEntityConverter extends DoFn<Row, Entity> {
+    private final boolean useNonRandomKey;
+
+    RowToEntityConverter(boolean useNonRandomKey) {
+      super();
+      this.useNonRandomKey = useNonRandomKey;
+    }
+
+    @DoFn.ProcessElement
+    public void processElement(ProcessContext context) {
+      Row row = context.element();
+
+      Schema schemaWithoutKeyField =
+          Schema.builder()
+              .addFields(
+                  row.getSchema().getFields().stream()
+                      .filter(field -> !field.getName().equals(keyField))
+                      .collect(Collectors.toList()))
+              .build();
+      Entity.Builder entityBuilder = 
constructEntityFromRow(schemaWithoutKeyField, row);
+      entityBuilder.setKey(constructKeyFromRow(row));
+
+      context.output(entityBuilder.build());
+    }
+
+    /**
+     * Converts an entire {@code Row} to an appropriate DataStore {@code 
Entity.Builder}.
+     *
+     * @param row {@code Row} to convert.
+     * @return resulting {@code Entity.Builder}.
+     */
+    private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
+      Entity.Builder entityBuilder = Entity.newBuilder();
+      for (Schema.Field field : schema.getFields()) {
+        Value val = mapObjectToValue(row.getValue(field.getName()));
+        entityBuilder.putProperties(field.getName(), val);
+      }
+      return entityBuilder;
+    }
+
+    /**
+     * Create a random key for a {@code Row} without a keyField or use a 
user-specified key by
+     * parsing it from byte array when keyField is set.
+     *
+     * @param row {@code Row} to construct a key for.
+     * @return resulting {@code Key}.
+     */
+    private com.google.datastore.v1.Key constructKeyFromRow(Row row) {
+      if (!useNonRandomKey) {
+        // When key field is not present - use key supplier to generate a 
random one.
+        return makeKey(kind, keySupplier.get()).build();
+      }
+      byte[] keyBytes = row.getBytes(keyField);
+      try {
+        return com.google.datastore.v1.Key.parseFrom(keyBytes);
+      } catch (InvalidProtocolBufferException e) {
+        throw new IllegalStateException("Failed to parse DataStore key from 
bytes.");
+      }
+    }
+
+    /**
+     * Converts a {@code Row} value to an appropriate DataStore {@code Value} 
object.
+     *
+     * @param value {@code Row} value to convert.
+     * @return resulting {@code Value}.
+     * @throws IllegalStateException when no mapping function for object of 
given type exists.
+     */
+    private Value mapObjectToValue(Object value) {
+      if (value == null) {
+        return Value.newBuilder().build();
+      }
+
+      if (Boolean.class.equals(value.getClass())) {
+        return makeValue((Boolean) value).build();
+      } else if (Byte.class.equals(value.getClass())) {
+        return makeValue((Byte) value).build();
+      } else if (Long.class.equals(value.getClass())) {
+        return makeValue((Long) value).build();
+      } else if (Short.class.equals(value.getClass())) {
+        return makeValue((Short) value).build();
+      } else if (Integer.class.equals(value.getClass())) {
+        return makeValue((Integer) value).build();
+      } else if (Double.class.equals(value.getClass())) {
+        return makeValue((Double) value).build();
+      } else if (Float.class.equals(value.getClass())) {
+        return makeValue((Float) value).build();
+      } else if (String.class.equals(value.getClass())) {
+        return makeValue((String) value).build();
+      } else if (Instant.class.equals(value.getClass())) {
+        return makeValue(((Instant) value).toDate()).build();
+      } else if (byte[].class.equals(value.getClass())) {
+        return makeValue(ByteString.copyFrom((byte[]) value)).build();
+      } else if (value instanceof Row) {
+        // Recursive conversion to handle nested rows.
+        Row row = (Row) value;
+        return makeValue(constructEntityFromRow(row.getSchema(), row)).build();
+      } else if (value instanceof Collection) {
+        // Recursive to handle nested collections.
+        Collection<Object> collection = (Collection<Object>) value;
+        List<Value> arrayValues =
+            
collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
+        return makeValue(arrayValues).build();
+      }
+      throw new IllegalStateException(
+          "No conversion exists from type: " + value.getClass() + " to 
DataStove Value.");
+    }
+  }
+}
diff --git 
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
new file mode 100644
index 0000000..1474744
--- /dev/null
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import 
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  public static final String KEY_FIELD_PROPERTY = "keyField";
+  private DataStoreV1SchemaIOProvider provider = new 
DataStoreV1SchemaIOProvider();
+
+  @Test
+  public void testGetTableType() {
+    assertEquals("datastoreV1", provider.identifier());
+  }
+
+  @Test
+  public void testBuildBeamSqlTable() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration =
+        Row.withSchema(provider.configurationSchema())
+            .withFieldValue(KEY_FIELD_PROPERTY, null)
+            .build();
+    SchemaIO schemaIO = provider.from(location, configuration, 
generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration =
+        Row.withSchema(provider.configurationSchema())
+            .withFieldValue(KEY_FIELD_PROPERTY, "field_name")
+            .build();
+    SchemaIO schemaIO = provider.from(location, configuration, 
generateDataSchema());
+
+    assertNotNull(schemaIO);
+    assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+    DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+    assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+    assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+    assertEquals("field_name", dataStoreV1SchemaIO.keyField);
+  }
+
+  @Test
+  public void testTableProperty_nullValue_throwsException() {
+    final String location = "projectId/batch_kind";
+
+    Row configuration =
+        Row.withSchema(provider.configurationSchema())
+            .withFieldValue(KEY_FIELD_PROPERTY, "")
+            .build();
+
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            (new DataStoreV1SchemaIOProvider())
+                .from(location, configuration, generateDataSchema()));
+  }
+
+  private static Schema generateDataSchema() {
+    return Schema.builder()
+        .addNullableField("id", Schema.FieldType.INT32)
+        .addNullableField("name", Schema.FieldType.STRING)
+        .build();
+  }
+}
diff --git 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
similarity index 90%
rename from 
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
rename to 
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
index a682b2a..32090bc 100644
--- 
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
+++ 
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
@@ -15,13 +15,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+package org.apache.beam.sdk.io.gcp.datastore;
 
 import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
 import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
-import static 
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
-import static 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
-import static 
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTES;
 import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
@@ -39,9 +36,6 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.UUID;
 import java.util.stream.Collectors;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
-import 
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.schemas.Schema.FieldType;
 import org.apache.beam.sdk.testing.PAssert;
@@ -50,17 +44,20 @@ import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.Row;
 import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 
 @RunWith(JUnit4.class)
-public class DataStoreTableTest {
+public class EntityToRowRowToEntityTest {
   private static final String KIND = "kind";
   private static final String UUID_VALUE = UUID.randomUUID().toString();
   private static final Key.Builder KEY = makeKey(KIND, UUID_VALUE);
   private static final DateTime DATE_TIME = 
parseTimestampWithUTCTimeZone("2018-05-28 20:17:40");
+  static final String DEFAULT_KEY_FIELD = "__key__";
+  private static final FieldType VARBINARY = FieldType.BYTES;
 
   private static final Schema NESTED_ROW_SCHEMA =
       Schema.builder().addNullableField("nestedLong", INT64).build();
@@ -74,7 +71,7 @@ public class DataStoreTableTest {
           .addNullableField("rowArray", 
array(FieldType.row(NESTED_ROW_SCHEMA)))
           .addNullableField("double", DOUBLE)
           .addNullableField("bytes", BYTES)
-          .addNullableField("string", CalciteUtils.CHAR)
+          .addNullableField("string", STRING)
           .addNullableField("nullable", INT64)
           .build();
   private static final Entity NESTED_ENTITY =
@@ -187,4 +184,12 @@ public class DataStoreTableTest {
   private static Row row(Schema schema, Object... values) {
     return Row.withSchema(schema).addValues(values).build();
   }
+
+  public static DateTime parseTimestampWithUTCTimeZone(String str) {
+    if (str.indexOf('.') == -1) {
+      return DateTimeFormat.forPattern("yyyy-MM-dd 
HH:mm:ss").withZoneUTC().parseDateTime(str);
+    } else {
+      return DateTimeFormat.forPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZoneUTC().parseDateTime(str);
+    }
+  }
 }

Reply via email to