This is an automated email from the ASF dual-hosted git repository.
bhulette pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new ab12f12 [BEAM-10557] Implemented SchemaIOProvider for DataStoreV1,
Refactored tests (#12341)
ab12f12 is described below
commit ab12f121fd0416defc3752980d2c2daa8e91fb0c
Author: sclukas77 <[email protected]>
AuthorDate: Tue Aug 18 17:32:26 2020 +0000
[BEAM-10557] Implemented SchemaIOProvider for DataStoreV1, Refactored tests
(#12341)
* Implemented SchemaIOProvider for DataStoreV1, refactored tests
* Modified SchemaIOTableProviderWrapper#getTableStatistics
* Update
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
Co-authored-by: Scott Lukas <[email protected]>
Co-authored-by: Brian Hulette <[email protected]>
---
.../sdk/schemas/io/InvalidLocationException.java | 39 ++
.../provider/SchemaIOTableProviderWrapper.java | 4 +-
.../meta/provider/datastore/DataStoreV1Table.java | 435 ---------------------
.../datastore/DataStoreV1TableProvider.java | 37 +-
.../provider/datastore/DataStoreReadWriteIT.java | 4 +-
.../datastore/DataStoreTableProviderTest.java | 112 ------
.../gcp/datastore/DataStoreV1SchemaIOProvider.java | 179 +++++++++
.../beam/sdk/io/gcp/datastore/EntityToRow.java | 161 ++++++++
.../beam/sdk/io/gcp/datastore/RowToEntity.java | 202 ++++++++++
.../datastore/DataStoreV1SchemaIOProviderTest.java | 104 +++++
.../gcp/datastore/EntityToRowRowToEntityTest.java} | 23 +-
11 files changed, 733 insertions(+), 567 deletions(-)
diff --git
a/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
new file mode 100644
index 0000000..4cfb5e9
--- /dev/null
+++
b/sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/io/InvalidLocationException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.schemas.io;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.annotations.Internal;
+
+/** Exception thrown when the configuration for a {@link SchemaIO} is invalid.
*/
+@Internal
+@Experimental(Kind.SCHEMAS)
+public class InvalidLocationException extends IllegalArgumentException {
+ public InvalidLocationException(String msg) {
+ super(msg);
+ }
+
+ public InvalidLocationException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public InvalidLocationException(Throwable cause) {
+ super(cause);
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
index c5044a5..e8292fb 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/SchemaIOTableProviderWrapper.java
@@ -80,7 +80,7 @@ public abstract class SchemaIOTableProviderWrapper extends
InMemoryMetaTableProv
}
}
- private BeamTableStatistics getTableStatistics(PipelineOptions options) {
+ protected BeamTableStatistics getTableStatistics(PipelineOptions options,
SchemaIO schemaIO) {
if (isBounded().equals(PCollection.IsBounded.BOUNDED)) {
return BeamTableStatistics.BOUNDED_UNKNOWN;
}
@@ -123,7 +123,7 @@ public abstract class SchemaIOTableProviderWrapper extends
InMemoryMetaTableProv
@Override
public BeamTableStatistics getTableStatistics(PipelineOptions options) {
- return SchemaIOTableProviderWrapper.this.getTableStatistics(options);
+ return SchemaIOTableProviderWrapper.this.getTableStatistics(options,
schemaIO);
}
}
}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
deleted file mode 100644
index 7efdb13..0000000
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1Table.java
+++ /dev/null
@@ -1,435 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
-
-import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
-import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
-
-import com.alibaba.fastjson.JSONObject;
-import com.google.datastore.v1.Entity;
-import com.google.datastore.v1.Query;
-import com.google.datastore.v1.Value;
-import com.google.datastore.v1.Value.ValueTypeCase;
-import com.google.protobuf.ByteString;
-import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.function.Supplier;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.stream.Collectors;
-import org.apache.beam.sdk.annotations.Experimental;
-import org.apache.beam.sdk.annotations.Internal;
-import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
-import org.apache.beam.sdk.extensions.sql.meta.SchemaBaseBeamTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.extensions.sql.meta.provider.InvalidTableException;
-import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
-import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
-import org.apache.beam.sdk.options.PipelineOptions;
-import org.apache.beam.sdk.schemas.Schema;
-import org.apache.beam.sdk.schemas.Schema.FieldType;
-import org.apache.beam.sdk.schemas.Schema.TypeName;
-import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.values.PBegin;
-import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PCollection.IsBounded;
-import org.apache.beam.sdk.values.POutput;
-import org.apache.beam.sdk.values.Row;
-import
org.apache.beam.vendor.calcite.v1_20_0.com.google.common.annotations.VisibleForTesting;
-import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
-import org.joda.time.Instant;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Internal
-@Experimental
-class DataStoreV1Table extends SchemaBaseBeamTable implements Serializable {
- public static final String KEY_FIELD_PROPERTY = "keyField";
- @VisibleForTesting static final String DEFAULT_KEY_FIELD = "__key__";
- private static final Logger LOG =
LoggerFactory.getLogger(DataStoreV1Table.class);
- // Should match: `projectId/kind`.
- private static final Pattern locationPattern =
Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
- @VisibleForTesting final String keyField;
- @VisibleForTesting final String projectId;
- @VisibleForTesting final String kind;
-
- DataStoreV1Table(Table table) {
- super(table.getSchema());
-
- // TODO: allow users to specify a name of the field to store a key value
via TableProperties.
- JSONObject properties = table.getProperties();
- if (properties.containsKey(KEY_FIELD_PROPERTY)) {
- String field = properties.getString(KEY_FIELD_PROPERTY);
- if (!(field != null && !field.isEmpty())) {
- throw new InvalidTableException(
- String.format("'%s' property cannot be null.",
KEY_FIELD_PROPERTY));
- }
- keyField = field;
- } else {
- keyField = DEFAULT_KEY_FIELD;
- }
- // TODO: allow users to specify a namespace in a location string.
- String location = table.getLocation();
- if (location == null) {
- throw new InvalidTableException("DataStoreV1 location must be set: " +
table);
- }
- Matcher matcher = locationPattern.matcher(location);
-
- if (!matcher.matches()) {
- throw new InvalidTableException(
- "DataStoreV1 location must be in the following format:
'projectId/kind'"
- + " but was:"
- + location);
- }
-
- this.projectId = matcher.group("projectId");
- this.kind = matcher.group("kind");
- }
-
- @Override
- public PCollection<Row> buildIOReader(PBegin begin) {
- Query.Builder q = Query.newBuilder();
- q.addKindBuilder().setName(kind);
- Query query = q.build();
-
- DatastoreV1.Read readInstance =
- DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
-
- return begin
- .apply("Read Datastore Entities", readInstance)
- .apply("Convert Datastore Entities to Rows",
EntityToRow.create(getSchema(), keyField));
- }
-
- @Override
- public POutput buildIOWriter(PCollection<Row> input) {
- return input
- .apply("Convert Rows to Datastore Entities",
RowToEntity.create(keyField, kind))
- .apply("Write Datastore Entities",
DatastoreIO.v1().write().withProjectId(projectId));
- }
-
- @Override
- public IsBounded isBounded() {
- return IsBounded.BOUNDED;
- }
-
- @Override
- public BeamTableStatistics getTableStatistics(PipelineOptions options) {
- long count =
-
DatastoreIO.v1().read().withProjectId(projectId).getNumEntities(options, kind,
null);
-
- if (count < 0) {
- return BeamTableStatistics.BOUNDED_UNKNOWN;
- }
-
- return BeamTableStatistics.createBoundedTableStatistics((double) count);
- }
-
- /**
- * A {@code PTransform} to perform a conversion of {@code
PCollection<Entity>} to {@code
- * PCollection<Row>}.
- */
- public static class EntityToRow extends PTransform<PCollection<Entity>,
PCollection<Row>> {
- private final Schema schema;
- private final String keyField;
-
- private EntityToRow(Schema schema, String keyField) {
- this.schema = schema;
- this.keyField = keyField;
-
- if (schema.getFieldNames().contains(keyField)) {
- if
(!schema.getField(keyField).getType().getTypeName().equals(TypeName.BYTES)) {
- throw new IllegalStateException(
- "Field `"
- + keyField
- + "` should of type `VARBINARY`. Please change the type or
specify a field to"
- + " store the KEY value.");
- }
- LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
- }
- }
-
- /**
- * Create a PTransform instance.
- *
- * @param schema {@code Schema} of the target row.
- * @param keyField A name of the row field to store the {@code Key} in.
- * @return {@code PTransform} instance for Entity to Row conversion.
- */
- public static EntityToRow create(Schema schema, String keyField) {
- return new EntityToRow(schema, keyField);
- }
-
- @Override
- public PCollection<Row> expand(PCollection<Entity> input) {
- return input.apply(ParDo.of(new
EntityToRowConverter())).setRowSchema(schema);
- }
-
- @VisibleForTesting
- class EntityToRowConverter extends DoFn<Entity, Row> {
-
- @DoFn.ProcessElement
- public void processElement(ProcessContext context) {
- Entity entity = context.element();
- ImmutableMap.Builder<String, Value> mapBuilder =
ImmutableMap.builder();
- mapBuilder.put(keyField, makeValue(entity.getKey()).build());
- mapBuilder.putAll(entity.getPropertiesMap());
-
- context.output(extractRowFromProperties(schema, mapBuilder.build()));
- }
-
- /**
- * Convert DataStore {@code Value} to Beam type.
- *
- * @param currentFieldType Beam {@code Schema.FieldType} to convert to
(used for {@code Row}
- * and {@code Array}).
- * @param val DataStore {@code Value}.
- * @return resulting Beam type.
- */
- private Object convertValueToObject(FieldType currentFieldType, Value
val) {
- ValueTypeCase typeCase = val.getValueTypeCase();
-
- switch (typeCase) {
- case NULL_VALUE:
- case VALUETYPE_NOT_SET:
- return null;
- case BOOLEAN_VALUE:
- return val.getBooleanValue();
- case INTEGER_VALUE:
- return val.getIntegerValue();
- case DOUBLE_VALUE:
- return val.getDoubleValue();
- case TIMESTAMP_VALUE:
- com.google.protobuf.Timestamp time = val.getTimestampValue();
- long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
- return Instant.ofEpochMilli(millis).toDateTime();
- case STRING_VALUE:
- return val.getStringValue();
- case KEY_VALUE:
- return val.getKeyValue().toByteArray();
- case BLOB_VALUE:
- return val.getBlobValue().toByteArray();
- case ENTITY_VALUE:
- // Recursive mapping for row type.
- Schema rowSchema = currentFieldType.getRowSchema();
- assert rowSchema != null;
- Entity entity = val.getEntityValue();
- return extractRowFromProperties(rowSchema,
entity.getPropertiesMap());
- case ARRAY_VALUE:
- // Recursive mapping for collection type.
- FieldType elementType =
currentFieldType.getCollectionElementType();
- List<Value> valueList = val.getArrayValue().getValuesList();
- return valueList.stream()
- .map(v -> convertValueToObject(elementType, v))
- .collect(Collectors.toList());
- case GEO_POINT_VALUE:
- default:
- throw new IllegalStateException(
- "No conversion exists from type: "
- + val.getValueTypeCase().name()
- + " to Beam type.");
- }
- }
-
- /**
- * Converts all properties of an {@code Entity} to Beam {@code Row}.
- *
- * @param schema Target row {@code Schema}.
- * @param values A map of property names and values.
- * @return resulting Beam {@code Row}.
- */
- private Row extractRowFromProperties(Schema schema, Map<String, Value>
values) {
- Row.Builder builder = Row.withSchema(schema);
- // It is not a guarantee that the values will be in the same order as
the schema.
- // Maybe metadata:
- //
https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
- // TODO: figure out in what order the elements are in (without relying
on Beam schema).
- for (Schema.Field field : schema.getFields()) {
- Value val = values.get(field.getName());
- builder.addValue(convertValueToObject(field.getType(), val));
- }
- return builder.build();
- }
- }
- }
-
- /**
- * A {@code PTransform} to perform a conversion of {@code PCollection<Row>}
to {@code
- * PCollection<Entity>}.
- */
- public static class RowToEntity extends PTransform<PCollection<Row>,
PCollection<Entity>> {
- private final Supplier<String> keySupplier;
- private final String kind;
- private final String keyField;
-
- private RowToEntity(Supplier<String> keySupplier, String kind, String
keyField) {
- this.keySupplier = keySupplier;
- this.kind = kind;
- this.keyField = keyField;
- }
-
- @Override
- public PCollection<Entity> expand(PCollection<Row> input) {
- boolean isFieldPresent =
input.getSchema().getFieldNames().contains(keyField);
- if (isFieldPresent) {
- if
(!input.getSchema().getField(keyField).getType().getTypeName().equals(TypeName.BYTES))
{
- throw new IllegalStateException(
- "Field `"
- + keyField
- + "` should of type `VARBINARY`. Please change the type or
specify a field to"
- + " write the KEY value from via TableProperties.");
- }
- LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
- }
- return input.apply(ParDo.of(new RowToEntityConverter(isFieldPresent)));
- }
-
- /**
- * Create a PTransform instance.
- *
- * @param keyField Row field containing a serialized {@code Key}, must be
set when using user
- * specified keys.
- * @param kind DataStore `Kind` data will be written to (required when
generating random {@code
- * Key}s).
- * @return {@code PTransform} instance for Row to Entity conversion.
- */
- public static RowToEntity create(String keyField, String kind) {
- return new RowToEntity(
- (Supplier<String> & Serializable) () ->
UUID.randomUUID().toString(), kind, keyField);
- }
-
- @VisibleForTesting
- static RowToEntity createTest(String keyString, String keyField, String
kind) {
- return new RowToEntity((Supplier<String> & Serializable) () ->
keyString, kind, keyField);
- }
-
- @VisibleForTesting
- class RowToEntityConverter extends DoFn<Row, Entity> {
- private final boolean useNonRandomKey;
-
- RowToEntityConverter(boolean useNonRandomKey) {
- super();
- this.useNonRandomKey = useNonRandomKey;
- }
-
- @DoFn.ProcessElement
- public void processElement(ProcessContext context) {
- Row row = context.element();
-
- Schema schemaWithoutKeyField =
- Schema.builder()
- .addFields(
- row.getSchema().getFields().stream()
- .filter(field -> !field.getName().equals(keyField))
- .collect(Collectors.toList()))
- .build();
- Entity.Builder entityBuilder =
constructEntityFromRow(schemaWithoutKeyField, row);
- entityBuilder.setKey(constructKeyFromRow(row));
-
- context.output(entityBuilder.build());
- }
-
- /**
- * Converts an entire {@code Row} to an appropriate DataStore {@code
Entity.Builder}.
- *
- * @param row {@code Row} to convert.
- * @return resulting {@code Entity.Builder}.
- */
- private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
- Entity.Builder entityBuilder = Entity.newBuilder();
- for (Schema.Field field : schema.getFields()) {
- Value val = mapObjectToValue(row.getValue(field.getName()));
- entityBuilder.putProperties(field.getName(), val);
- }
- return entityBuilder;
- }
-
- /**
- * Create a random key for a {@code Row} without a keyField or use a
user-specified key by
- * parsing it from byte array when keyField is set.
- *
- * @param row {@code Row} to construct a key for.
- * @return resulting {@code Key}.
- */
- private com.google.datastore.v1.Key constructKeyFromRow(Row row) {
- if (!useNonRandomKey) {
- // When key field is not present - use key supplier to generate a
random one.
- return makeKey(kind, keySupplier.get()).build();
- }
- byte[] keyBytes = row.getBytes(keyField);
- try {
- return com.google.datastore.v1.Key.parseFrom(keyBytes);
- } catch (InvalidProtocolBufferException e) {
- throw new IllegalStateException("Failed to parse DataStore key from
bytes.");
- }
- }
-
- /**
- * Converts a {@code Row} value to an appropriate DataStore {@code
Value} object.
- *
- * @param value {@code Row} value to convert.
- * @throws IllegalStateException when no mapping function for object of
given type exists.
- * @return resulting {@code Value}.
- */
- private Value mapObjectToValue(Object value) {
- if (value == null) {
- return Value.newBuilder().build();
- }
-
- if (Boolean.class.equals(value.getClass())) {
- return makeValue((Boolean) value).build();
- } else if (Byte.class.equals(value.getClass())) {
- return makeValue((Byte) value).build();
- } else if (Long.class.equals(value.getClass())) {
- return makeValue((Long) value).build();
- } else if (Short.class.equals(value.getClass())) {
- return makeValue((Short) value).build();
- } else if (Integer.class.equals(value.getClass())) {
- return makeValue((Integer) value).build();
- } else if (Double.class.equals(value.getClass())) {
- return makeValue((Double) value).build();
- } else if (Float.class.equals(value.getClass())) {
- return makeValue((Float) value).build();
- } else if (String.class.equals(value.getClass())) {
- return makeValue((String) value).build();
- } else if (Instant.class.equals(value.getClass())) {
- return makeValue(((Instant) value).toDate()).build();
- } else if (byte[].class.equals(value.getClass())) {
- return makeValue(ByteString.copyFrom((byte[]) value)).build();
- } else if (value instanceof Row) {
- // Recursive conversion to handle nested rows.
- Row row = (Row) value;
- return makeValue(constructEntityFromRow(row.getSchema(),
row)).build();
- } else if (value instanceof Collection) {
- // Recursive to handle nested collections.
- Collection<Object> collection = (Collection<Object>) value;
- List<Value> arrayValues =
-
collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
- return makeValue(arrayValues).build();
- }
- throw new IllegalStateException(
- "No conversion exists from type: " + value.getClass() + " to
DataStove Value.");
- }
- }
- }
-}
diff --git
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
index 9ecf668..b579e06 100644
---
a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
+++
b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreV1TableProvider.java
@@ -18,13 +18,21 @@
package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
import com.google.auto.service.AutoService;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.InMemoryMetaTableProvider;
+import org.apache.beam.sdk.extensions.sql.impl.BeamTableStatistics;
+import
org.apache.beam.sdk.extensions.sql.meta.provider.SchemaIOTableProviderWrapper;
import org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
+import org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider;
+import
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
/**
- * {@link TableProvider} for {@link DataStoreV1Table}.
+ * {@link TableProvider} for {@link DatastoreIO} for consumption by Beam SQL.
+ *
+ * <p>Passes the {@link DataStoreV1SchemaIOProvider} to the generalized table
provider wrapper,
+ * {@link SchemaIOTableProviderWrapper}, for DataStoreV1 specific behavior.
*
* <p>A sample of DataStoreV1Table table is:
*
@@ -39,7 +47,11 @@ import
org.apache.beam.sdk.extensions.sql.meta.provider.TableProvider;
* }</pre>
*/
@AutoService(TableProvider.class)
-public class DataStoreV1TableProvider extends InMemoryMetaTableProvider {
+public class DataStoreV1TableProvider extends SchemaIOTableProviderWrapper {
+ @Override
+ public SchemaIOProvider getSchemaIOProvider() {
+ return new DataStoreV1SchemaIOProvider();
+ }
@Override
public String getTableType() {
@@ -47,7 +59,18 @@ public class DataStoreV1TableProvider extends
InMemoryMetaTableProvider {
}
@Override
- public BeamSqlTable buildBeamSqlTable(Table table) {
- return new DataStoreV1Table(table);
+ public BeamTableStatistics getTableStatistics(PipelineOptions options,
SchemaIO schemaIO) {
+ DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+ long count =
+ DatastoreIO.v1()
+ .read()
+ .withProjectId(dataStoreV1SchemaIO.getProjectId())
+ .getNumEntities(options, dataStoreV1SchemaIO.getKind(), null);
+
+ if (count < 0) {
+ return BeamTableStatistics.BOUNDED_UNKNOWN;
+ }
+
+ return BeamTableStatistics.createBoundedTableStatistics((double) count);
}
}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
index 3274846..c2c499a 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
+++
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreReadWriteIT.java
@@ -38,11 +38,11 @@ import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv;
import org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryOptions;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreIO;
import org.apache.beam.sdk.io.gcp.datastore.DatastoreV1;
+import org.apache.beam.sdk.io.gcp.datastore.EntityToRow;
+import org.apache.beam.sdk.io.gcp.datastore.RowToEntity;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
deleted file mode 100644
index 879add1..0000000
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableProviderTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
-
-import static
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
-import static
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.KEY_FIELD_PROPERTY;
-import static org.apache.beam.sdk.schemas.Schema.toSchema;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertThrows;
-import static org.junit.Assert.assertTrue;
-
-import com.alibaba.fastjson.JSON;
-import java.util.stream.Stream;
-import org.apache.beam.sdk.extensions.sql.meta.BeamSqlTable;
-import org.apache.beam.sdk.extensions.sql.meta.Table;
-import org.apache.beam.sdk.schemas.Schema;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-@RunWith(JUnit4.class)
-public class DataStoreTableProviderTest {
- private DataStoreV1TableProvider provider = new DataStoreV1TableProvider();
-
- @Test
- public void testGetTableType() {
- assertEquals("datastoreV1", provider.getTableType());
- }
-
- @Test
- public void testBuildBeamSqlTable() {
- final String location = "projectId/batch_kind";
- Table table = fakeTable("TEST", location);
- BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-
- assertNotNull(sqlTable);
- assertTrue(sqlTable instanceof DataStoreV1Table);
-
- DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
- assertEquals("projectId", datastoreTable.projectId);
- assertEquals("batch_kind", datastoreTable.kind);
- assertEquals(DEFAULT_KEY_FIELD, datastoreTable.keyField);
- }
-
- @Test
- public void testTableProperty() {
- final String location = "projectId/batch_kind";
- Table table =
- fakeTableWithProperties("TEST", location, "{ " + KEY_FIELD_PROPERTY +
": \"field_name\" }");
- BeamSqlTable sqlTable = provider.buildBeamSqlTable(table);
-
- assertNotNull(sqlTable);
- assertTrue(sqlTable instanceof DataStoreV1Table);
-
- DataStoreV1Table datastoreTable = (DataStoreV1Table) sqlTable;
- assertEquals("projectId", datastoreTable.projectId);
- assertEquals("batch_kind", datastoreTable.kind);
- assertEquals("field_name", datastoreTable.keyField);
- }
-
- @Test
- public void testTableProperty_nullValue_throwsException() {
- final String location = "projectId/batch_kind";
- Table table = fakeTableWithProperties("TEST", location, "{ " +
KEY_FIELD_PROPERTY + ": \"\" }");
- assertThrows(IllegalArgumentException.class, () ->
provider.buildBeamSqlTable(table));
- }
-
- private static Table fakeTable(String name, String location) {
- return Table.builder()
- .name(name)
- .comment(name + " table")
- .location(location)
- .schema(
- Stream.of(
- Schema.Field.nullable("id", Schema.FieldType.INT32),
- Schema.Field.nullable("name", Schema.FieldType.STRING))
- .collect(toSchema()))
- .type("datastoreV1")
- .build();
- }
-
- private static Table fakeTableWithProperties(String name, String location,
String properties) {
- return Table.builder()
- .name(name)
- .comment(name + " table")
- .location(location)
- .schema(
- Stream.of(
- Schema.Field.nullable("id", Schema.FieldType.INT32),
- Schema.Field.nullable("name", Schema.FieldType.STRING))
- .collect(toSchema()))
- .type("datastoreV1")
- .properties(JSON.parseObject(properties))
- .build();
- }
-}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
new file mode 100644
index 0000000..216ccf4
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProvider.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import com.google.auto.service.AutoService;
+import com.google.datastore.v1.Query;
+import java.io.Serializable;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import org.apache.beam.sdk.annotations.Internal;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.InvalidConfigurationException;
+import org.apache.beam.sdk.schemas.io.InvalidLocationException;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.schemas.io.SchemaIOProvider;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.POutput;
+import org.apache.beam.sdk.values.Row;
+
+/**
+ * An implementation of {@link SchemaIOProvider} for reading and writing
payloads with {@link
+ * DatastoreIO}.
+ */
+@Internal
+@AutoService(SchemaIOProvider.class)
+public class DataStoreV1SchemaIOProvider implements SchemaIOProvider {
+ public static final String KEY_FIELD_PROPERTY = "keyField";
+ static final String DEFAULT_KEY_FIELD = "__key__";
+ private static final Pattern locationPattern =
Pattern.compile("(?<projectId>.+)/(?<kind>.+)");
+
+ /** Returns an id that uniquely represents this IO. */
+ @Override
+ public String identifier() {
+ return "datastoreV1";
+ }
+
+ /**
+ * Returns the expected schema of the configuration object. Note this is
distinct from the schema
+ * of the data source itself.
+ *
+ * <p>Configuration Parameters:
+ *
+ * <ul>
+ * <li>STRING keyField: The name of the Beam schema field to map the
DataStore entity key.
+ * Defaults to {@code __key__} if not set or null.
+ * </ul>
+ */
+ @Override
+ public Schema configurationSchema() {
+ // TODO: allow users to specify a name of the field to store a key value
via TableProperties.
+ return Schema.builder().addNullableField(KEY_FIELD_PROPERTY,
Schema.FieldType.STRING).build();
+ }
+
+ /**
+ * Produce a SchemaIO given a String representing the data's location, the
schema of the data that
+ * resides there, and some IO-specific configuration object.
+ */
+ @Override
+ public DataStoreV1SchemaIO from(String location, Row configuration, Schema
dataSchema) {
+ return new DataStoreV1SchemaIO(location, configuration, dataSchema);
+ }
+
+ @Override
+ public boolean requiresDataSchema() {
+ return true;
+ }
+
+ @Override
+ public PCollection.IsBounded isBounded() {
+ return PCollection.IsBounded.BOUNDED;
+ }
+
+ /** An abstraction to create schema aware IOs. */
+ public static class DataStoreV1SchemaIO implements SchemaIO, Serializable {
+ protected final Schema dataSchema;
+ protected final String location;
+ protected final String kind;
+ protected final String projectId;
+ protected final String keyField;
+
+ private DataStoreV1SchemaIO(String location, Row config, Schema
dataSchema) {
+ this.location = location;
+ this.dataSchema = dataSchema;
+ this.keyField = determineKeyField(config.getString(KEY_FIELD_PROPERTY));
+
+ Matcher matcher = locationPattern.matcher(this.location);
+ validateLocation(location, matcher);
+
+ this.kind = matcher.group("kind");
+ this.projectId = matcher.group("projectId");
+ }
+
+ @Override
+ public Schema schema() {
+ return dataSchema;
+ }
+
+ @Override
+ public PTransform<PBegin, PCollection<Row>> buildReader() {
+ return new PTransform<PBegin, PCollection<Row>>() {
+ @Override
+ public PCollection<Row> expand(PBegin begin) {
+ Query.Builder q = Query.newBuilder();
+ q.addKindBuilder().setName(kind);
+ Query query = q.build();
+
+ DatastoreV1.Read readInstance =
+
DatastoreIO.v1().read().withProjectId(projectId).withQuery(query);
+
+ return begin
+ .apply("Read Datastore Entities", readInstance)
+ .apply(
+ "Convert Datastore Entities to Rows",
EntityToRow.create(dataSchema, keyField));
+ }
+ };
+ }
+
+ @Override
+ public PTransform<PCollection<Row>, POutput> buildWriter() {
+ return new PTransform<PCollection<Row>, POutput>() {
+ @Override
+ public POutput expand(PCollection<Row> input) {
+ return input
+ .apply("Convert Rows to Datastore Entities",
RowToEntity.create(keyField, kind))
+ .apply("Write Datastore Entities",
DatastoreIO.v1().write().withProjectId(projectId));
+ }
+ };
+ }
+
+ public String getProjectId() {
+ return projectId;
+ }
+
+ public String getKind() {
+ return kind;
+ }
+
+ private String determineKeyField(String configKey) {
+ if (configKey != null && configKey.isEmpty()) {
+ throw new InvalidConfigurationException(
+ String.format("'%s' property cannot be null.",
KEY_FIELD_PROPERTY));
+ } else if (configKey != null) {
+ return configKey;
+ } else {
+ return DEFAULT_KEY_FIELD;
+ }
+ }
+
+ private void validateLocation(String location, Matcher matcher) {
+ // TODO: allow users to specify a namespace in a location string.
+ if (location == null) {
+ throw new InvalidLocationException("DataStoreV1 location must be set.
");
+ }
+ if (!matcher.matches()) {
+ throw new InvalidLocationException(
+ "DataStoreV1 location must be in the following format:
'projectId/kind'"
+ + " but was:"
+ + location);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
new file mode 100644
index 0000000..b86323c
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRow.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Entity} to {@link
Row}. */
+public class EntityToRow extends PTransform<PCollection<Entity>,
PCollection<Row>> {
+ private final Schema schema;
+ private final String keyField;
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+ private EntityToRow(Schema schema, String keyField) {
+ this.schema = schema;
+ this.keyField = keyField;
+
+ if (schema.getFieldNames().contains(keyField)) {
+ if
(!schema.getField(keyField).getType().getTypeName().equals(Schema.TypeName.BYTES))
{
+ throw new IllegalStateException(
+ "Field `"
+ + keyField
+ + "` should of type `BYTES`. Please change the type or specify
a field to"
+ + " store the KEY value.");
+ }
+ LOG.info("Entity KEY will be stored under `" + keyField + "` field.");
+ }
+ }
+
+ /**
+ * Create a PTransform instance.
+ *
+ * @param schema {@code Schema} of the target row.
+ * @param keyField A name of the row field to store the {@code Key} in.
+ * @return {@code PTransform} instance for Entity to Row conversion.
+ */
+ public static EntityToRow create(Schema schema, String keyField) {
+ return new EntityToRow(schema, keyField);
+ }
+
+ @Override
+ public PCollection<Row> expand(PCollection<Entity> input) {
+ return input.apply(ParDo.of(new
EntityToRow.EntityToRowConverter())).setRowSchema(schema);
+ }
+
+ class EntityToRowConverter extends DoFn<Entity, Row> {
+
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext context) {
+ Entity entity = context.element();
+ ImmutableMap.Builder<String, Value> mapBuilder = ImmutableMap.builder();
+ mapBuilder.put(keyField, makeValue(entity.getKey()).build());
+ mapBuilder.putAll(entity.getPropertiesMap());
+
+ context.output(extractRowFromProperties(schema, mapBuilder.build()));
+ }
+
+ /**
+ * Convert DataStore {@code Value} to Beam type.
+ *
+ * @param currentFieldType Beam {@code Schema.FieldType} to convert to
(used for {@code Row} and
+ * {@code Array}).
+ * @param val DataStore {@code Value}.
+ * @return resulting Beam type.
+ */
+ private Object convertValueToObject(Schema.FieldType currentFieldType,
Value val) {
+ Value.ValueTypeCase typeCase = val.getValueTypeCase();
+
+ switch (typeCase) {
+ case NULL_VALUE:
+ case VALUETYPE_NOT_SET:
+ return null;
+ case BOOLEAN_VALUE:
+ return val.getBooleanValue();
+ case INTEGER_VALUE:
+ return val.getIntegerValue();
+ case DOUBLE_VALUE:
+ return val.getDoubleValue();
+ case TIMESTAMP_VALUE:
+ com.google.protobuf.Timestamp time = val.getTimestampValue();
+ long millis = time.getSeconds() * 1000 + time.getNanos() / 1000;
+ return Instant.ofEpochMilli(millis).toDateTime();
+ case STRING_VALUE:
+ return val.getStringValue();
+ case KEY_VALUE:
+ return val.getKeyValue().toByteArray();
+ case BLOB_VALUE:
+ return val.getBlobValue().toByteArray();
+ case ENTITY_VALUE:
+ // Recursive mapping for row type.
+ Schema rowSchema = currentFieldType.getRowSchema();
+ assert rowSchema != null;
+ Entity entity = val.getEntityValue();
+ return extractRowFromProperties(rowSchema,
entity.getPropertiesMap());
+ case ARRAY_VALUE:
+ // Recursive mapping for collection type.
+ Schema.FieldType elementType =
currentFieldType.getCollectionElementType();
+ List<Value> valueList = val.getArrayValue().getValuesList();
+ return valueList.stream()
+ .map(v -> convertValueToObject(elementType, v))
+ .collect(Collectors.toList());
+ case GEO_POINT_VALUE:
+ default:
+ throw new IllegalStateException(
+ "No conversion exists from type: "
+ + val.getValueTypeCase().name()
+ + " to Beam type.");
+ }
+ }
+
+ /**
+ * Converts all properties of an {@code Entity} to Beam {@code Row}.
+ *
+ * @param schema Target row {@code Schema}.
+ * @param values A map of property names and values.
+ * @return resulting Beam {@code Row}.
+ */
+ private Row extractRowFromProperties(Schema schema, Map<String, Value>
values) {
+ Row.Builder builder = Row.withSchema(schema);
+ // It is not a guarantee that the values will be in the same order as
the schema.
+ // Maybe metadata:
+ //
https://cloud.google.com/appengine/docs/standard/python/datastore/metadataqueries
+ // TODO: figure out in what order the elements are in (without relying
on Beam schema).
+ for (Schema.Field field : schema.getFields()) {
+ Value val = values.get(field.getName());
+ builder.addValue(convertValueToObject(field.getType(), val));
+ }
+ return builder.build();
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
new file mode 100644
index 0000000..02ca67c
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/datastore/RowToEntity.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
+import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
+
+import com.google.datastore.v1.Entity;
+import com.google.datastore.v1.Value;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+import java.util.UUID;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@code PTransform} to perform a conversion of {@link Row} to {@link
Entity}. */
+public class RowToEntity extends PTransform<PCollection<Row>,
PCollection<Entity>> {
+ private final Supplier<String> keySupplier;
+ private final String kind;
+ private final String keyField;
+ private static final Logger LOG =
LoggerFactory.getLogger(DataStoreV1SchemaIOProvider.class);
+
+ private RowToEntity(Supplier<String> keySupplier, String kind, String
keyField) {
+ this.keySupplier = keySupplier;
+ this.kind = kind;
+ this.keyField = keyField;
+ }
+
+ @Override
+ public PCollection<Entity> expand(PCollection<Row> input) {
+ boolean isFieldPresent =
input.getSchema().getFieldNames().contains(keyField);
+ if (isFieldPresent) {
+ if (!input
+ .getSchema()
+ .getField(keyField)
+ .getType()
+ .getTypeName()
+ .equals(Schema.TypeName.BYTES)) {
+ throw new IllegalStateException(
+ "Field `"
+ + keyField
+ + "` should of type `VARBINARY`. Please change the type or
specify a field to"
+ + " write the KEY value from via TableProperties.");
+ }
+ LOG.info("Field to use as Entity KEY is set to: `" + keyField + "`.");
+ }
+ return input.apply(ParDo.of(new
RowToEntity.RowToEntityConverter(isFieldPresent)));
+ }
+
+ /**
+ * Create a PTransform instance.
+ *
+ * @param keyField Row field containing a serialized {@code Key}, must be
set when using user
+ * specified keys.
+ * @param kind DataStore `Kind` data will be written to (required when
generating random {@code
+ * Key}s).
+ * @return {@code PTransform} instance for Row to Entity conversion.
+ */
+ public static RowToEntity create(String keyField, String kind) {
+ return new RowToEntity(
+ (Supplier<String> & Serializable) () -> UUID.randomUUID().toString(),
kind, keyField);
+ }
+
+ public static RowToEntity createTest(String keyString, String keyField,
String kind) {
+ return new RowToEntity((Supplier<String> & Serializable) () -> keyString,
kind, keyField);
+ }
+
+ class RowToEntityConverter extends DoFn<Row, Entity> {
+ private final boolean useNonRandomKey;
+
+ RowToEntityConverter(boolean useNonRandomKey) {
+ super();
+ this.useNonRandomKey = useNonRandomKey;
+ }
+
+ @DoFn.ProcessElement
+ public void processElement(ProcessContext context) {
+ Row row = context.element();
+
+ Schema schemaWithoutKeyField =
+ Schema.builder()
+ .addFields(
+ row.getSchema().getFields().stream()
+ .filter(field -> !field.getName().equals(keyField))
+ .collect(Collectors.toList()))
+ .build();
+ Entity.Builder entityBuilder =
constructEntityFromRow(schemaWithoutKeyField, row);
+ entityBuilder.setKey(constructKeyFromRow(row));
+
+ context.output(entityBuilder.build());
+ }
+
+ /**
+ * Converts an entire {@code Row} to an appropriate DataStore {@code
Entity.Builder}.
+ *
+ * @param row {@code Row} to convert.
+ * @return resulting {@code Entity.Builder}.
+ */
+ private Entity.Builder constructEntityFromRow(Schema schema, Row row) {
+ Entity.Builder entityBuilder = Entity.newBuilder();
+ for (Schema.Field field : schema.getFields()) {
+ Value val = mapObjectToValue(row.getValue(field.getName()));
+ entityBuilder.putProperties(field.getName(), val);
+ }
+ return entityBuilder;
+ }
+
+ /**
+ * Create a random key for a {@code Row} without a keyField or use a
user-specified key by
+ * parsing it from byte array when keyField is set.
+ *
+ * @param row {@code Row} to construct a key for.
+ * @return resulting {@code Key}.
+ */
+ private com.google.datastore.v1.Key constructKeyFromRow(Row row) {
+ if (!useNonRandomKey) {
+ // When key field is not present - use key supplier to generate a
random one.
+ return makeKey(kind, keySupplier.get()).build();
+ }
+ byte[] keyBytes = row.getBytes(keyField);
+ try {
+ return com.google.datastore.v1.Key.parseFrom(keyBytes);
+ } catch (InvalidProtocolBufferException e) {
+ throw new IllegalStateException("Failed to parse DataStore key from
bytes.");
+ }
+ }
+
+ /**
+ * Converts a {@code Row} value to an appropriate DataStore {@code Value}
object.
+ *
+ * @param value {@code Row} value to convert.
+ * @return resulting {@code Value}.
+ * @throws IllegalStateException when no mapping function for object of
given type exists.
+ */
+ private Value mapObjectToValue(Object value) {
+ if (value == null) {
+ return Value.newBuilder().build();
+ }
+
+ if (Boolean.class.equals(value.getClass())) {
+ return makeValue((Boolean) value).build();
+ } else if (Byte.class.equals(value.getClass())) {
+ return makeValue((Byte) value).build();
+ } else if (Long.class.equals(value.getClass())) {
+ return makeValue((Long) value).build();
+ } else if (Short.class.equals(value.getClass())) {
+ return makeValue((Short) value).build();
+ } else if (Integer.class.equals(value.getClass())) {
+ return makeValue((Integer) value).build();
+ } else if (Double.class.equals(value.getClass())) {
+ return makeValue((Double) value).build();
+ } else if (Float.class.equals(value.getClass())) {
+ return makeValue((Float) value).build();
+ } else if (String.class.equals(value.getClass())) {
+ return makeValue((String) value).build();
+ } else if (Instant.class.equals(value.getClass())) {
+ return makeValue(((Instant) value).toDate()).build();
+ } else if (byte[].class.equals(value.getClass())) {
+ return makeValue(ByteString.copyFrom((byte[]) value)).build();
+ } else if (value instanceof Row) {
+ // Recursive conversion to handle nested rows.
+ Row row = (Row) value;
+ return makeValue(constructEntityFromRow(row.getSchema(), row)).build();
+ } else if (value instanceof Collection) {
+ // Recursive to handle nested collections.
+ Collection<Object> collection = (Collection<Object>) value;
+ List<Value> arrayValues =
+
collection.stream().map(this::mapObjectToValue).collect(Collectors.toList());
+ return makeValue(arrayValues).build();
+ }
+ throw new IllegalStateException(
+ "No conversion exists from type: " + value.getClass() + " to
DataStove Value.");
+ }
+ }
+}
diff --git
a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
new file mode 100644
index 0000000..1474744
--- /dev/null
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/DataStoreV1SchemaIOProviderTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.datastore;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import
org.apache.beam.sdk.io.gcp.datastore.DataStoreV1SchemaIOProvider.DataStoreV1SchemaIO;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.schemas.io.SchemaIO;
+import org.apache.beam.sdk.values.Row;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class DataStoreV1SchemaIOProviderTest {
+ static final String DEFAULT_KEY_FIELD = "__key__";
+ public static final String KEY_FIELD_PROPERTY = "keyField";
+ private DataStoreV1SchemaIOProvider provider = new
DataStoreV1SchemaIOProvider();
+
+ @Test
+ public void testGetTableType() {
+ assertEquals("datastoreV1", provider.identifier());
+ }
+
+ @Test
+ public void testBuildBeamSqlTable() {
+ final String location = "projectId/batch_kind";
+
+ Row configuration =
+ Row.withSchema(provider.configurationSchema())
+ .withFieldValue(KEY_FIELD_PROPERTY, null)
+ .build();
+ SchemaIO schemaIO = provider.from(location, configuration,
generateDataSchema());
+
+ assertNotNull(schemaIO);
+ assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+ DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+ assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+ assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+ assertEquals(DEFAULT_KEY_FIELD, dataStoreV1SchemaIO.keyField);
+ }
+
+ @Test
+ public void testTableProperty() {
+ final String location = "projectId/batch_kind";
+
+ Row configuration =
+ Row.withSchema(provider.configurationSchema())
+ .withFieldValue(KEY_FIELD_PROPERTY, "field_name")
+ .build();
+ SchemaIO schemaIO = provider.from(location, configuration,
generateDataSchema());
+
+ assertNotNull(schemaIO);
+ assertTrue(schemaIO instanceof DataStoreV1SchemaIO);
+
+ DataStoreV1SchemaIO dataStoreV1SchemaIO = (DataStoreV1SchemaIO) schemaIO;
+ assertEquals("projectId", dataStoreV1SchemaIO.projectId);
+ assertEquals("batch_kind", dataStoreV1SchemaIO.kind);
+ assertEquals("field_name", dataStoreV1SchemaIO.keyField);
+ }
+
+ @Test
+ public void testTableProperty_nullValue_throwsException() {
+ final String location = "projectId/batch_kind";
+
+ Row configuration =
+ Row.withSchema(provider.configurationSchema())
+ .withFieldValue(KEY_FIELD_PROPERTY, "")
+ .build();
+
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ (new DataStoreV1SchemaIOProvider())
+ .from(location, configuration, generateDataSchema()));
+ }
+
+ private static Schema generateDataSchema() {
+ return Schema.builder()
+ .addNullableField("id", Schema.FieldType.INT32)
+ .addNullableField("name", Schema.FieldType.STRING)
+ .build();
+ }
+}
diff --git
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
similarity index 90%
rename from
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
rename to
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
index a682b2a..32090bc 100644
---
a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datastore/DataStoreTableTest.java
+++
b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/datastore/EntityToRowRowToEntityTest.java
@@ -15,13 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.beam.sdk.extensions.sql.meta.provider.datastore;
+package org.apache.beam.sdk.io.gcp.datastore;
import static com.google.datastore.v1.client.DatastoreHelper.makeKey;
import static com.google.datastore.v1.client.DatastoreHelper.makeValue;
-import static
org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.VARBINARY;
-import static
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.DEFAULT_KEY_FIELD;
-import static
org.apache.beam.sdk.extensions.sql.utils.DateTimeUtils.parseTimestampWithUTCTimeZone;
import static org.apache.beam.sdk.schemas.Schema.FieldType.BOOLEAN;
import static org.apache.beam.sdk.schemas.Schema.FieldType.BYTES;
import static org.apache.beam.sdk.schemas.Schema.FieldType.DATETIME;
@@ -39,9 +36,6 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.stream.Collectors;
-import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.EntityToRow;
-import
org.apache.beam.sdk.extensions.sql.meta.provider.datastore.DataStoreV1Table.RowToEntity;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.testing.PAssert;
@@ -50,17 +44,20 @@ import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.joda.time.DateTime;
+import org.joda.time.format.DateTimeFormat;
import org.junit.Rule;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
-public class DataStoreTableTest {
+public class EntityToRowRowToEntityTest {
private static final String KIND = "kind";
private static final String UUID_VALUE = UUID.randomUUID().toString();
private static final Key.Builder KEY = makeKey(KIND, UUID_VALUE);
private static final DateTime DATE_TIME =
parseTimestampWithUTCTimeZone("2018-05-28 20:17:40");
+ static final String DEFAULT_KEY_FIELD = "__key__";
+ private static final FieldType VARBINARY = FieldType.BYTES;
private static final Schema NESTED_ROW_SCHEMA =
Schema.builder().addNullableField("nestedLong", INT64).build();
@@ -74,7 +71,7 @@ public class DataStoreTableTest {
.addNullableField("rowArray",
array(FieldType.row(NESTED_ROW_SCHEMA)))
.addNullableField("double", DOUBLE)
.addNullableField("bytes", BYTES)
- .addNullableField("string", CalciteUtils.CHAR)
+ .addNullableField("string", STRING)
.addNullableField("nullable", INT64)
.build();
private static final Entity NESTED_ENTITY =
@@ -187,4 +184,12 @@ public class DataStoreTableTest {
private static Row row(Schema schema, Object... values) {
return Row.withSchema(schema).addValues(values).build();
}
+
+ public static DateTime parseTimestampWithUTCTimeZone(String str) {
+ if (str.indexOf('.') == -1) {
+ return DateTimeFormat.forPattern("yyyy-MM-dd
HH:mm:ss").withZoneUTC().parseDateTime(str);
+ } else {
+ return DateTimeFormat.forPattern("yyyy-MM-dd
HH:mm:ss.SSS").withZoneUTC().parseDateTime(str);
+ }
+ }
}