This is an automated email from the ASF dual-hosted git repository.
mbae pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new c986168 [BEAM-11289] [Java] Integrate Google Cloud Recommendations AI
functionality
c986168 is described below
commit c986168f992ce8c51ef83500b2b9db91cfe3eaf5
Author: Matthias Baetens <[email protected]>
AuthorDate: Sat Jun 19 15:37:34 2021 +0100
[BEAM-11289] [Java] Integrate Google Cloud Recommendations AI functionality
---
sdks/java/extensions/ml/build.gradle | 12 ++
.../ml/RecommendationAICreateCatalogItem.java | 134 +++++++++++++
.../beam/sdk/extensions/ml/RecommendationAIIO.java | 51 +++++
.../ml/RecommendationAIImportCatalogItems.java | 200 +++++++++++++++++++
.../ml/RecommendationAIImportUserEvents.java | 216 +++++++++++++++++++++
.../sdk/extensions/ml/RecommendationAIPredict.java | 156 +++++++++++++++
.../ml/RecommendationAIWriteUserEvent.java | 146 ++++++++++++++
.../sdk/extensions/ml/DelegatingAtomicCoder.java | 53 +++++
.../beam/sdk/extensions/ml/GenericJsonCoder.java | 64 ++++++
.../ml/RecommendationAICatalogItemIT.java | 119 ++++++++++++
.../extensions/ml/RecommendationAIPredictIT.java | 80 ++++++++
.../extensions/ml/RecommendationAIUserEventIT.java | 111 +++++++++++
12 files changed, 1342 insertions(+)
diff --git a/sdks/java/extensions/ml/build.gradle
b/sdks/java/extensions/ml/build.gradle
index b231e8e..a8acfe7 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -31,13 +31,19 @@ dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
+ compile library.java.google_http_client
+ compile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7'
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
compile 'com.google.cloud:google-cloud-dlp:1.1.4'
compile 'com.google.cloud:google-cloud-language:1.99.4'
+ compile library.java.protobuf_java_util
+ compile group: 'org.json', name: 'json', version: '20201115'
compile 'com.google.api.grpc:proto-google-cloud-dlp-v2:1.1.4'
compile 'com.google.api.grpc:proto-google-cloud-language-v1:1.81.4'
compile
'com.google.api.grpc:proto-google-cloud-video-intelligence-v1:1.2.0'
compile 'com.google.api.grpc:proto-google-cloud-vision-v1:1.81.3'
+ compile
'com.google.api.grpc:proto-google-cloud-recommendations-ai-v1beta1:0.3.7'
+ compile library.java.joda_time
compile library.java.auto_value_annotations
compile library.java.gax
compile library.java.protobuf_java
@@ -46,6 +52,12 @@ dependencies {
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-vision:1.99.3'
testCompile library.java.mockito_core
+ testCompile library.java.google_http_client
+ testCompile library.java.protobuf_java_util
+ testCompile group: 'org.json', name: 'json', version: '20201115'
+ testCompile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7'
+ testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+ testCompile 'com.google.cloud:google-cloud-dlp:1.1.4'
testCompile project(path:
":sdks:java:extensions:google-cloud-platform-core", configuration:
"testRuntime")
testRuntimeOnly project(path: ":runners:direct-java", configuration:
"shadow")
testRuntimeOnly project(":runners:google-cloud-dataflow-java")
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
new file mode 100644
index 0000000..55c5aba
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and converts
them to and creates
+ * {@link CatalogItem}s. It outputs a PCollectionTuple which will contain the
successfully created
+ * and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAICreateCatalogItem
+ extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /**
+ * @return Name of the catalog where the catalog items will be created
(defaults to
+ * "default_catalog").
+ */
+ public abstract @Nullable String catalogName();
+
+ public static final TupleTag<CatalogItem> SUCCESS_TAG = new
TupleTag<CatalogItem>() {};
+
+ public static final TupleTag<CatalogItem> FAILURE_TAG = new
TupleTag<CatalogItem>() {};
+
+ abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ /** @param projectId ID of Google Cloud project to be used for creating
catalog items. */
+ public abstract Builder setProjectId(@Nullable String projectId);
+
+ /** @param catalogName Name of the catalog where the catalog items will be
created. */
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ public abstract RecommendationAICreateCatalogItem build();
+ }
+
+ static Builder newBuilder() {
+ return new AutoValue_RecommendationAICreateCatalogItem.Builder()
+ .setCatalogName("default_catalog");
+ }
+
+ public RecommendationAICreateCatalogItem withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAICreateCatalogItem withCatalogName(String catalogName)
{
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
CatalogItem}s and then
+ * calls the Recommendation AI service to create the catalog item.
+ *
+ * @param input input PCollection
+ * @return PCollectionTuple with successful and failed {@link CatalogItem}s
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<GenericJson> input) {
+ return input.apply(
+ ParDo.of(new CreateCatalogItem(projectId(), catalogName()))
+ .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+ }
+
+ private static class CreateCatalogItem extends DoFn<GenericJson,
CatalogItem> {
+ private final String projectId;
+ private final String catalogName;
+
+ /**
+ * @param projectId ID of GCP project to be used for creating catalog
items.
+ * @param catalogName Catalog name for CatalogItem creation.
+ */
+ private CreateCatalogItem(String projectId, String catalogName) {
+ this.projectId = projectId;
+ this.catalogName = catalogName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws IOException {
+ CatalogName parent = CatalogName.of(projectId, "global", catalogName);
+ CatalogItem.Builder catalogItemBuilder = CatalogItem.newBuilder();
+ JsonFormat.parser().merge((new
JSONObject(context.element())).toString(), catalogItemBuilder);
+ CatalogItem catalogItem = catalogItemBuilder.build();
+
+ try (CatalogServiceClient catalogServiceClient =
CatalogServiceClient.create()) {
+ CatalogItem response = catalogServiceClient.createCatalogItem(parent,
catalogItem);
+
+ context.output(SUCCESS_TAG, response);
+ } catch (ApiException e) {
+ context.output(FAILURE_TAG, catalogItem);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
new file mode 100644
index 0000000..48ba7d6
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+/**
+ * The RecommendationAIIO class acts as a wrapper around the {@link
PTransform}s that interact with
+ * the Recommendation AI API (https://cloud.google.com/recommendations).
+ *
+ * <p>More information can be found on: - Writing catalog items using {@link
+ * RecommendationAICreateCatalogItem} - Importing catalog items using {@link
+ * RecommendationAIImportCatalogItems} - Writing user events using {@link
+ * RecommendationAIWriteUserEvent} - Importing user events using {@link
+ * RecommendationAIImportUserEvents} - Making predictions using {@link
RecommendationAIPredict}
+ */
+public class RecommendationAIIO {
+
+ public static RecommendationAICreateCatalogItem createCatalogItems() {
+ return RecommendationAICreateCatalogItem.newBuilder().build();
+ }
+
+ public static RecommendationAIImportCatalogItems importCatalogItems() {
+ return RecommendationAIImportCatalogItems.newBuilder().build();
+ }
+
+ public static RecommendationAIWriteUserEvent writeUserEvent() {
+ return RecommendationAIWriteUserEvent.newBuilder().build();
+ }
+
+ public static RecommendationAIImportUserEvents importUserEvents() {
+ return RecommendationAIImportUserEvents.newBuilder().build();
+ }
+
+ public static RecommendationAIPredict predictAll() {
+ return RecommendationAIPredict.newBuilder().build();
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
new file mode 100644
index 0000000..1bea407
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max:
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<CatalogItem> SUCCESS_TAG = new
TupleTag<CatalogItem>() {};
+ public static final TupleTag<CatalogItem> FAILURE_TAG = new
TupleTag<CatalogItem>() {};
+
+ static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+ }
+
+ abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the catalog items will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportCatalogItems withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportCatalogItems withCatalogName(String
catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
CatalogItem}s and then
+ * calls the Recommendation AI service to create the catalog item.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+ return input
+ .apply(
+ "Batch Contents",
+ GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+ .withMaxBufferingDuration(maxBufferingDuration())
+ .withShardedKey())
+ .apply(
+ "Import CatalogItems",
+ ParDo.of(new ImportCatalogItems(projectId(), catalogName()))
+ .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ /** @param projectId ID of Google Cloud project to be used for creating
catalog items. */
+ public abstract Builder setProjectId(@Nullable String projectId);
+
+ /** @param catalogName Name of the catalog where the catalog items will be
created. */
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ /**
+ * @param batchSize Amount of input elements to be sent to Recommendation
AI service in one
+ * request.
+ */
+ public abstract Builder setBatchSize(Integer batchSize);
+
+ /**
+ * @param maxBufferingDuration Time limit (in processing time) on how long
an incomplete batch
+ * of elements is allowed to be buffered.
+ */
+ public abstract Builder setMaxBufferingDuration(Duration
maxBufferingDuration);
+
+ public abstract RecommendationAIImportCatalogItems build();
+ }
+
+ private static class ImportCatalogItems
+ extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, CatalogItem>
{
+ private final String projectId;
+ private final String catalogName;
+
+ /**
+ * @param projectId ID of GCP project to be used for creating catalog
items.
+ * @param catalogName Catalog name for CatalogItem creation.
+ */
+ private ImportCatalogItems(String projectId, String catalogName) {
+ this.projectId = projectId;
+ this.catalogName = catalogName;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c)
+ throws IOException, ExecutionException, InterruptedException {
+ CatalogName parent = CatalogName.of(projectId, "global", catalogName);
+
+ ArrayList<CatalogItem> catalogItems = new ArrayList<>();
+ for (GenericJson element : c.element().getValue()) {
+ CatalogItem.Builder catalogItemBuilder = CatalogItem.newBuilder();
+ JsonFormat.parser().merge((new JSONObject(element)).toString(),
catalogItemBuilder);
+ catalogItems.add(catalogItemBuilder.build());
+ }
+ CatalogInlineSource catalogInlineSource =
+
CatalogInlineSource.newBuilder().addAllCatalogItems(catalogItems).build();
+
+ InputConfig inputConfig =
+
InputConfig.newBuilder().mergeCatalogInlineSource(catalogInlineSource).build();
+ ImportCatalogItemsRequest request =
+ ImportCatalogItemsRequest.newBuilder()
+ .setParent(parent.toString())
+ .setInputConfig(inputConfig)
+ .build();
+ try (CatalogServiceClient catalogServiceClient =
CatalogServiceClient.create()) {
+ ImportCatalogItemsResponse response =
+ catalogServiceClient.importCatalogItemsAsync(request).get();
+ if (response.getErrorSamplesCount() > 0) {
+ for (CatalogItem ci : catalogItems) {
+ c.output(FAILURE_TAG, ci);
+ }
+ } else {
+ for (CatalogItem ci : catalogItems) {
+ c.output(SUCCESS_TAG, ci);
+ }
+ }
+ } catch (ApiException e) {
+ for (CatalogItem ci : catalogItems) {
+ c.output(SUCCESS_TAG, ci);
+ }
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
new file mode 100644
index 0000000..762c379
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s.
*
+ *
+ * <p>Batch size defines how many items are at once per batch (max: 5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the user event id as key and contents as value) and outputs a
PCollectionTuple which will contain
+ * the successfully created and failed user events.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog"). It is possible to provide a event store to which you
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportUserEvents
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<UserEvent> SUCCESS_TAG = new
TupleTag<UserEvent>() {};
+ public static final TupleTag<UserEvent> FAILURE_TAG = new
TupleTag<UserEvent>() {};
+
+ static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportUserEvents.Builder()
+ .setCatalogName("default_catalog")
+ .setEventStore("default_event_store");
+ }
+
+ abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating user events.
*/
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the user events will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Name of the event store where the user events will be created. */
+ public abstract @Nullable String eventStore();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportUserEvents withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportUserEvents withCatalogName(String catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportUserEvents withEventStore(String eventStore) {
+ return this.toBuilder().setEventStore(eventStore).build();
+ }
+
+ public RecommendationAIImportUserEvents withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
UserEvent}s and then calls
+ * the Recommendation AI service to create the user event.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+ return input
+ .apply(
+ "Batch Contents",
+ GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+ .withMaxBufferingDuration(maxBufferingDuration())
+ .withShardedKey())
+ .apply(
+ "Import CatalogItems",
+ ParDo.of(new ImportUserEvents(projectId(), catalogName(),
eventStore()))
+ .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ /** @param projectId ID of Google Cloud project to be used for creating
user events. */
+ public abstract Builder setProjectId(@Nullable String projectId);
+
+ /** @param catalogName Name of the catalog where the user events will be
created. */
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ /** @param eventStore Name of the event store where the user events will
be created. */
+ public abstract Builder setEventStore(@Nullable String eventStore);
+
+ /**
+ * @param batchSize Amount of input elements to be sent to Recommendation
AI service in one
+ * request.
+ */
+ public abstract Builder setBatchSize(Integer batchSize);
+
+ /**
+ * @param maxBufferingDuration Time limit (in processing time) on how long
an incomplete batch
+ * of elements is allowed to be buffered.
+ */
+ public abstract Builder setMaxBufferingDuration(Duration
maxBufferingDuration);
+
+ public abstract RecommendationAIImportUserEvents build();
+ }
+
+ private static class ImportUserEvents
+ extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, UserEvent> {
+ private final String projectId;
+ private final String catalogName;
+ private final String eventStore;
+
+ /**
+ * @param projectId ID of GCP project to be used for creating user events.
+ * @param catalogName Catalog name for UserEvent creation.
+ * @param eventStore Event store name for UserEvent creation.
+ */
+ private ImportUserEvents(String projectId, String catalogName, String
eventStore) {
+ this.projectId = projectId;
+ this.catalogName = catalogName;
+ this.eventStore = eventStore;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext c)
+ throws IOException, ExecutionException, InterruptedException {
+ EventStoreName parent = EventStoreName.of(projectId, "global",
catalogName, eventStore);
+
+ ArrayList<UserEvent> userEvents = new ArrayList<>();
+ for (GenericJson element : c.element().getValue()) {
+ UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+ JsonFormat.parser().merge((new JSONObject(element)).toString(),
userEventBuilder);
+ userEvents.add(userEventBuilder.build());
+ }
+ UserEventInlineSource userEventInlineSource =
+
UserEventInlineSource.newBuilder().addAllUserEvents(userEvents).build();
+
+ InputConfig inputConfig =
+
InputConfig.newBuilder().mergeUserEventInlineSource(userEventInlineSource).build();
+ ImportUserEventsRequest request =
+ ImportUserEventsRequest.newBuilder()
+ .setParent(parent.toString())
+ .setInputConfig(inputConfig)
+ .build();
+ try (UserEventServiceClient userEventServiceClient =
UserEventServiceClient.create()) {
+ ImportUserEventsResponse response =
+ userEventServiceClient.importUserEventsAsync(request).get();
+ if (response.getErrorSamplesCount() > 0) {
+ for (UserEvent ci : userEvents) {
+ c.output(FAILURE_TAG, ci);
+ }
+ } else {
+ for (UserEvent ci : userEvents) {
+ c.output(SUCCESS_TAG, ci);
+ }
+ }
+ } catch (ApiException e) {
+ for (UserEvent ci : userEvents) {
+ c.output(FAILURE_TAG, ci);
+ }
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
new file mode 100644
index 0000000..64aea84
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.PlacementName;
+import com.google.cloud.recommendationengine.v1beta1.PredictResponse;
+import com.google.cloud.recommendationengine.v1beta1.PredictionServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and creates
{@link
+ * PredictResponse.PredictionResult}s.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
user event (defaults to
+ * "default_catalog"). It is possible to provide a event store to which you
want to add the user
+ * event (defaults to "default_event_store"). A placement id for the
recommendation engine placement
+ * to be used.
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIPredict
+ extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the catalog items will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Name of the event store where the user events will be created. */
+ public abstract @Nullable String eventStore();
+
+ /** @return ID of the recommendation engine placement. */
+ public abstract String placementId();
+
+ public static final TupleTag<PredictResponse.PredictionResult> SUCCESS_TAG =
+ new TupleTag<PredictResponse.PredictionResult>() {};
+
+ public static final TupleTag<UserEvent> FAILURE_TAG = new
TupleTag<UserEvent>() {};
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ /** @param projectId ID of Google Cloud project to be used for the
predictions. */
+ public abstract Builder setProjectId(@Nullable String projectId);
+
+ /** @param catalogName Name of the catalog to be used for predictions. */
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ /** @param eventStore Name of the event store to be used for predictions.
*/
+ public abstract Builder setEventStore(@Nullable String eventStore);
+
+ /** @param placementId of the recommendation engine placement. */
+ public abstract Builder setPlacementId(String placementId);
+
+ public abstract RecommendationAIPredict build();
+ }
+
+ static Builder newBuilder() {
+ return new AutoValue_RecommendationAIPredict.Builder()
+ .setCatalogName("default_catalog")
+ .setEventStore("default_event_store")
+ .setPlacementId("recently_viewed_default");
+ }
+
+ abstract Builder toBuilder();
+
+ public RecommendationAIPredict withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIPredict withCatalogName(String catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIPredict withEventStore(String eventStore) {
+ return this.toBuilder().setEventStore(eventStore).build();
+ }
+
+ public RecommendationAIPredict withPlacementId(String placementId) {
+ return this.toBuilder().setPlacementId(placementId).build();
+ }
+
+ @Override
+ public PCollectionTuple expand(PCollection<GenericJson> input) {
+ return input.apply(
+ ParDo.of(new Predict(projectId(), catalogName(), eventStore(),
placementId()))
+ .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+ }
+
+ private static class Predict extends DoFn<GenericJson,
PredictResponse.PredictionResult> {
+ private final String projectId;
+ private final String catalogName;
+ private final String eventStore;
+ private final String placementId;
+
+ /**
+ * @param projectId ID of GCP project to be used for creating catalog
items.
+ * @param catalogName Catalog name for UserEvent creation.
+ * @param eventStore Event store for UserEvent creation.
+ * @param placementId ID of the recommendation engine placement.
+ */
+ private Predict(String projectId, String catalogName, String eventStore,
String placementId) {
+ this.projectId = projectId;
+ this.catalogName = catalogName;
+ this.eventStore = eventStore;
+ this.placementId = placementId;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context) throws IOException {
+ PlacementName name =
+ PlacementName.of(projectId, "global", catalogName, eventStore,
placementId);
+ UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+ JsonFormat.parser().merge((new
JSONObject(context.element())).toString(), userEventBuilder);
+ UserEvent userEvent = userEventBuilder.build();
+ try (PredictionServiceClient predictionServiceClient =
PredictionServiceClient.create()) {
+ for (PredictResponse.PredictionResult res :
+ predictionServiceClient.predict(name, userEvent).iterateAll()) {
+ context.output(SUCCESS_TAG, res);
+ }
+ } catch (ApiException e) {
+ context.output(FAILURE_TAG, userEvent);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
new file mode 100644
index 0000000..ba1f2d8
--- /dev/null
+++
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and converts
them to and creates
+ * {@link UserEvent}s.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
user event (defaults to
+ * "default_catalog"). It is possible to provide a event store to which you
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIWriteUserEvent
+ extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+ public static final TupleTag<UserEvent> SUCCESS_TAG = new
TupleTag<UserEvent>() {};
+ public static final TupleTag<UserEvent> FAILURE_TAG = new
TupleTag<UserEvent>() {};
+
+ static Builder newBuilder() {
+ return new AutoValue_RecommendationAIWriteUserEvent.Builder()
+ .setCatalogName("default_catalog")
+ .setEventStore("default_event_store");
+ }
+
+ /** @return ID of Google Cloud project to be used for creating user events.
*/
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the user events will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Name of the event store where the user events will be created. */
+ public abstract @Nullable String eventStore();
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
UserEvent}s and then calls
+ * the Recommendation AI service to create the user event.
+ *
+ * @param input input PCollection
+ * @return PCollectionTuple with successful and failed {@link UserEvent}s
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<GenericJson> input) {
+ return input.apply(
+ ParDo.of(new WriteUserEvent(projectId(), catalogName(), eventStore()))
+ .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+ }
+
+ @AutoValue.Builder
+ abstract static class Builder {
+ /** @param projectId ID of Google Cloud project to be used for creating
user events. */
+ public abstract Builder setProjectId(@Nullable String projectId);
+
+ /** @param catalogName Name of the catalog where the user events will be
created. */
+ public abstract Builder setCatalogName(@Nullable String catalogName);
+
+ /** @param eventStore Name of the event store where the user events will
be created. */
+ public abstract Builder setEventStore(@Nullable String eventStore);
+
+ public abstract RecommendationAIWriteUserEvent build();
+ }
+
+ abstract Builder toBuilder();
+
+ public RecommendationAIWriteUserEvent withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIWriteUserEvent withCatalogName(String catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIWriteUserEvent withEventStore(String eventStore) {
+ return this.toBuilder().setEventStore(eventStore).build();
+ }
+
+ private static class WriteUserEvent extends DoFn<GenericJson, UserEvent> {
+ private final String projectId;
+ private final String catalogName;
+ private final String eventStore;
+
+ /**
+ * @param projectId ID of GCP project to be used for creating user events.
+ * @param catalogName Catalog name for UserEvent creation.
+ * @param eventStore Event store for UserEvent creation.
+ */
+ private WriteUserEvent(String projectId, String catalogName, String
eventStore) {
+ this.projectId = projectId;
+ this.catalogName = catalogName;
+ this.eventStore = eventStore;
+ }
+
+ @ProcessElement
+ public void processElement(ProcessContext context)
+ throws IOException, ExecutionException, InterruptedException {
+ EventStoreName parent = EventStoreName.of(projectId, "global",
catalogName, eventStore);
+ UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+ JsonFormat.parser().merge((new
JSONObject(context.element())).toString(), userEventBuilder);
+ UserEvent userEvent = userEventBuilder.build();
+
+ try (UserEventServiceClient userEventServiceClient =
UserEventServiceClient.create()) {
+ UserEvent response = userEventServiceClient.writeUserEvent(parent,
userEvent);
+
+ context.output(SUCCESS_TAG, response);
+ } catch (ApiException e) {
+ context.output(FAILURE_TAG, userEvent);
+ }
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
new file mode 100644
index 0000000..2efdc04
--- /dev/null
+++
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+public abstract class DelegatingAtomicCoder<X, W> extends AtomicCoder<X> {
+
+ private final Coder<W> delegate;
+
+ protected DelegatingAtomicCoder(Coder<W> delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public final X decode(InputStream inStream) throws CoderException,
IOException {
+ return from(delegate.decode(inStream));
+ }
+
+ @Override
+ public final void encode(X value, OutputStream outStream) throws
CoderException, IOException {
+ delegate.encode(to(value), outStream);
+ }
+
+ protected abstract X from(W object) throws CoderException, IOException;
+
+ @Override
+ public void verifyDeterministic() throws NonDeterministicException {
+ delegate.verifyDeterministic();
+ }
+
+ protected abstract W to(X object) throws CoderException, IOException;
+}
diff --git
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
new file mode 100644
index 0000000..a37f6b6
--- /dev/null
+++
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.googleapis.util.Utils;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.JsonFactory;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * Can be used as a coder for any object that extends GenericJson. This
includes all objects in the
+ * Google Genomics Java client library.
+ */
+public class GenericJsonCoder<T extends GenericJson> extends
DelegatingAtomicCoder<T, String> {
+
+ private static final JsonFactory JSON_FACTORY =
Utils.getDefaultJsonFactory();
+ private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+ private final Class<T> type;
+
+ private GenericJsonCoder(Class<T> type) {
+ super(STRING_CODER);
+ this.type = type;
+ }
+
+ public static <T extends GenericJson> GenericJsonCoder<T> of(Class<T> type) {
+ return new GenericJsonCoder<>(type);
+ }
+
+ @JsonCreator
+ @SuppressWarnings("unchecked")
+ public static <T extends GenericJson> GenericJsonCoder<T>
of(@JsonProperty("type") String type)
+ throws ClassNotFoundException {
+ return of((Class<T>) Class.forName(type));
+ }
+
+ @Override
+ protected T from(String object) throws IOException {
+ return JSON_FACTORY.fromString(object, type);
+ }
+
+ @Override
+ protected String to(T object) throws IOException {
+ return JSON_FACTORY.toString(object);
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
new file mode 100644
index 0000000..aee981b
--- /dev/null
+++
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAICatalogItemIT {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ private static GenericJson getCatalogItem() {
+ List<Object> categories = new ArrayList<Object>();
+ categories.add(new GenericJson().set("categories",
Arrays.asList("Electronics", "Computers")));
+ categories.add(new GenericJson().set("categories",
Arrays.asList("Laptops")));
+ return new GenericJson()
+ .set("id", Integer.toString(new Random().nextInt()))
+ .set("title", "Sample Laptop")
+ .set("description", "Indisputably the most fantastic laptop ever
created.")
+ .set("categoryHierarchies", categories)
+ .set("languageCode", "en");
+ }
+
+ @Test
+ public void createCatalogItem() {
+ String projectId =
testPipeline.getOptions().as(GcpOptions.class).getProject();
+ GenericJson catalogItem = getCatalogItem();
+
+ PCollectionTuple createCatalogItemResult =
+ testPipeline
+ .apply(
+ Create.of(Arrays.asList(catalogItem))
+ .withCoder(GenericJsonCoder.of(GenericJson.class)))
+
.apply(RecommendationAIIO.createCatalogItems().withProjectId(projectId));
+
PAssert.that(createCatalogItemResult.get(RecommendationAICreateCatalogItem.SUCCESS_TAG))
+ .satisfies(new VerifyCatalogItemResult(1, (String)
catalogItem.get("id")));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Ignore("Import method causing issues")
+ @Test
+ public void importCatalogItems() {
+ String projectId =
testPipeline.getOptions().as(GcpOptions.class).getProject();
+ ArrayList<KV<String, GenericJson>> catalogItems = new ArrayList<>();
+
+ GenericJson catalogItem1 = getCatalogItem();
+ GenericJson catalogItem2 = getCatalogItem();
+
+ catalogItems.add(KV.of(Integer.toString(new Random().nextInt()),
catalogItem1));
+ catalogItems.add(KV.of(Integer.toString(new Random().nextInt()),
catalogItem2));
+
+ PCollectionTuple importCatalogItemResult =
+ testPipeline
+ .apply(Create.of(catalogItems))
+
.apply(RecommendationAIImportCatalogItems.newBuilder().setProjectId(projectId).build());
+
PAssert.that(importCatalogItemResult.get(RecommendationAIImportCatalogItems.SUCCESS_TAG))
+ .satisfies(new VerifyCatalogItemResult(2, (String)
catalogItem1.get("id")));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private static class VerifyCatalogItemResult
+ implements SerializableFunction<Iterable<CatalogItem>, Void> {
+
+ String catalogItemId;
+ int size;
+
+ private VerifyCatalogItemResult(int size, String catalogItemId) {
+ this.size = size;
+ this.catalogItemId = catalogItemId;
+ }
+
+ @Override
+ public Void apply(Iterable<CatalogItem> input) {
+ List<String> matches = new ArrayList<>();
+ input.forEach(
+ item -> {
+ CatalogItem result = item;
+ matches.add(result.getId());
+ });
+ assertTrue(matches.contains(this.catalogItemId));
+ assertEquals(size, matches.size());
+ return null;
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
new file mode 100644
index 0000000..a39b0b1
--- /dev/null
+++
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.PredictResponse;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAIPredictIT {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ public static GenericJson getUserEvent() {
+ GenericJson userInfo = new GenericJson().set("visitorId", "1");
+ return new GenericJson().set("eventType",
"home-page-view").set("userInfo", userInfo);
+ }
+
+ @Test
+ public void predict() {
+ String projectId =
testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+ PCollectionTuple predictResult =
+ testPipeline
+ .apply(
+ Create.of(Arrays.asList(getUserEvent()))
+ .withCoder(GenericJsonCoder.of(GenericJson.class)))
+ .apply(
+ RecommendationAIIO.predictAll()
+ .withProjectId(projectId)
+ .withPlacementId("recently_viewed_default"));
+ PAssert.that(predictResult.get(RecommendationAIPredict.SUCCESS_TAG))
+ .satisfies(new VerifyPredictResult());
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private static class VerifyPredictResult
+ implements
SerializableFunction<Iterable<PredictResponse.PredictionResult>, Void> {
+
+ @Override
+ public Void apply(Iterable<PredictResponse.PredictionResult> input) {
+ List<PredictResponse.PredictionResult> matches = new ArrayList<>();
+ input.forEach(
+ item -> {
+ PredictResponse.PredictionResult result = item;
+ matches.add(result);
+ });
+ assertTrue(!matches.isEmpty());
+ return null;
+ }
+ }
+}
diff --git
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
new file mode 100644
index 0000000..3be60a8
--- /dev/null
+++
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAIUserEventIT {
+ @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+ public static GenericJson getUserEvent() {
+ GenericJson userInfo = new GenericJson().set("visitorId", "1");
+ GenericJson productDetail = new GenericJson().set("id",
"1").set("quantity", 1);
+ ArrayList<GenericJson> productDetails = new ArrayList<>();
+ productDetails.add(productDetail);
+ GenericJson productEventDetail = new GenericJson().set("productDetails",
productDetails);
+ return new GenericJson()
+ .set("eventType", "detail-page-view")
+ .set("userInfo", userInfo)
+ .set("productEventDetail", productEventDetail);
+ }
+
+ @Test
+ public void createUserEvent() {
+ String projectId =
testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+ PCollectionTuple createUserEventResult =
+ testPipeline
+ .apply(
+ Create.of(Arrays.asList(getUserEvent()))
+ .withCoder(GenericJsonCoder.of(GenericJson.class)))
+
.apply(RecommendationAIIO.writeUserEvent().withProjectId(projectId));
+
PAssert.that(createUserEventResult.get(RecommendationAIWriteUserEvent.SUCCESS_TAG))
+ .satisfies(new VerifyUserEventResult(1));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ @Ignore("Import method causing issues")
+ @Test
+ public void importUserEvents() {
+ String projectId =
testPipeline.getOptions().as(GcpOptions.class).getProject();
+ ArrayList<KV<String, GenericJson>> userEvents = new ArrayList<>();
+ userEvents.add(KV.of("123", getUserEvent()));
+ userEvents.add(KV.of("123", getUserEvent()));
+
+ PCollectionTuple importUserEventResult =
+ testPipeline
+ .apply(Create.of(userEvents))
+
.apply(RecommendationAIImportUserEvents.newBuilder().setProjectId(projectId).build());
+
PAssert.that(importUserEventResult.get(RecommendationAIWriteUserEvent.SUCCESS_TAG))
+ .satisfies(new VerifyUserEventResult(2));
+ testPipeline.run().waitUntilFinish();
+ }
+
+ private static class VerifyUserEventResult
+ implements SerializableFunction<Iterable<UserEvent>, Void> {
+
+ int size;
+
+ private VerifyUserEventResult(int size) {
+ this.size = size;
+ }
+
+ @Override
+ public Void apply(Iterable<UserEvent> input) {
+ List<String> matches = new ArrayList<>();
+ input.forEach(
+ item -> {
+ UserEvent result = item;
+ matches.add(result.getUserInfo().getVisitorId());
+ });
+ assertTrue(matches.contains(((GenericJson)
getUserEvent().get("userInfo")).get("visitorId")));
+ assertEquals(size, matches.size());
+ return null;
+ }
+ }
+}