This is an automated email from the ASF dual-hosted git repository.

mbae pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c986168  [BEAM-11289] [Java] Integrate Google Cloud Recommendations AI 
functionality
c986168 is described below

commit c986168f992ce8c51ef83500b2b9db91cfe3eaf5
Author: Matthias Baetens <[email protected]>
AuthorDate: Sat Jun 19 15:37:34 2021 +0100

    [BEAM-11289] [Java] Integrate Google Cloud Recommendations AI functionality
---
 sdks/java/extensions/ml/build.gradle               |  12 ++
 .../ml/RecommendationAICreateCatalogItem.java      | 134 +++++++++++++
 .../beam/sdk/extensions/ml/RecommendationAIIO.java |  51 +++++
 .../ml/RecommendationAIImportCatalogItems.java     | 200 +++++++++++++++++++
 .../ml/RecommendationAIImportUserEvents.java       | 216 +++++++++++++++++++++
 .../sdk/extensions/ml/RecommendationAIPredict.java | 156 +++++++++++++++
 .../ml/RecommendationAIWriteUserEvent.java         | 146 ++++++++++++++
 .../sdk/extensions/ml/DelegatingAtomicCoder.java   |  53 +++++
 .../beam/sdk/extensions/ml/GenericJsonCoder.java   |  64 ++++++
 .../ml/RecommendationAICatalogItemIT.java          | 119 ++++++++++++
 .../extensions/ml/RecommendationAIPredictIT.java   |  80 ++++++++
 .../extensions/ml/RecommendationAIUserEventIT.java | 111 +++++++++++
 12 files changed, 1342 insertions(+)

diff --git a/sdks/java/extensions/ml/build.gradle 
b/sdks/java/extensions/ml/build.gradle
index b231e8e..a8acfe7 100644
--- a/sdks/java/extensions/ml/build.gradle
+++ b/sdks/java/extensions/ml/build.gradle
@@ -31,13 +31,19 @@ dependencies {
     compile project(path: ":sdks:java:core", configuration: "shadow")
     compile project(":sdks:java:expansion-service")
     permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
+    compile library.java.google_http_client
+    compile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7'
     compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
     compile 'com.google.cloud:google-cloud-dlp:1.1.4'
     compile 'com.google.cloud:google-cloud-language:1.99.4'
+    compile library.java.protobuf_java_util
+    compile group: 'org.json', name: 'json', version: '20201115'
     compile 'com.google.api.grpc:proto-google-cloud-dlp-v2:1.1.4'
     compile 'com.google.api.grpc:proto-google-cloud-language-v1:1.81.4'
     compile 
'com.google.api.grpc:proto-google-cloud-video-intelligence-v1:1.2.0'
     compile 'com.google.api.grpc:proto-google-cloud-vision-v1:1.81.3'
+    compile 
'com.google.api.grpc:proto-google-cloud-recommendations-ai-v1beta1:0.3.7'
+    compile library.java.joda_time
     compile library.java.auto_value_annotations
     compile library.java.gax
     compile library.java.protobuf_java
@@ -46,6 +52,12 @@ dependencies {
     testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
     compile 'com.google.cloud:google-cloud-vision:1.99.3'
     testCompile library.java.mockito_core
+    testCompile library.java.google_http_client
+    testCompile library.java.protobuf_java_util
+    testCompile group: 'org.json', name: 'json', version: '20201115'
+    testCompile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7'
+    testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile 'com.google.cloud:google-cloud-dlp:1.1.4'
     testCompile project(path: 
":sdks:java:extensions:google-cloud-platform-core", configuration: 
"testRuntime")
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
     testRuntimeOnly project(":runners:google-cloud-dataflow-java")
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
new file mode 100644
index 0000000..55c5aba
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAICreateCatalogItem.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API 
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and converts 
them to and creates
+ * {@link CatalogItem}s. It outputs a PCollectionTuple which will contain the 
successfully created
+ * and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAICreateCatalogItem
+    extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+  /** @return ID of Google Cloud project to be used for creating catalog 
items. */
+  public abstract @Nullable String projectId();
+
+  /**
+   * @return Name of the catalog where the catalog items will be created 
(defaults to
+   *     "default_catalog").
+   */
+  public abstract @Nullable String catalogName();
+
+  public static final TupleTag<CatalogItem> SUCCESS_TAG = new 
TupleTag<CatalogItem>() {};
+
+  public static final TupleTag<CatalogItem> FAILURE_TAG = new 
TupleTag<CatalogItem>() {};
+
+  abstract Builder toBuilder();
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    /** @param projectId ID of Google Cloud project to be used for creating 
catalog items. */
+    public abstract Builder setProjectId(@Nullable String projectId);
+
+    /** @param catalogName Name of the catalog where the catalog items will be 
created. */
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    public abstract RecommendationAICreateCatalogItem build();
+  }
+
+  static Builder newBuilder() {
+    return new AutoValue_RecommendationAICreateCatalogItem.Builder()
+        .setCatalogName("default_catalog");
+  }
+
+  public RecommendationAICreateCatalogItem withProjectId(String projectId) {
+    return this.toBuilder().setProjectId(projectId).build();
+  }
+
+  public RecommendationAICreateCatalogItem withCatalogName(String catalogName) 
{
+    return this.toBuilder().setCatalogName(catalogName).build();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
CatalogItem}s and then
+   * calls the Recommendation AI service to create the catalog item.
+   *
+   * @param input input PCollection
+   * @return PCollectionTuple with successful and failed {@link CatalogItem}s
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<GenericJson> input) {
+    return input.apply(
+        ParDo.of(new CreateCatalogItem(projectId(), catalogName()))
+            .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+  }
+
+  private static class CreateCatalogItem extends DoFn<GenericJson, 
CatalogItem> {
+    private final String projectId;
+    private final String catalogName;
+
+    /**
+     * @param projectId ID of GCP project to be used for creating catalog 
items.
+     * @param catalogName Catalog name for CatalogItem creation.
+     */
+    private CreateCatalogItem(String projectId, String catalogName) {
+      this.projectId = projectId;
+      this.catalogName = catalogName;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws IOException {
+      CatalogName parent = CatalogName.of(projectId, "global", catalogName);
+      CatalogItem.Builder catalogItemBuilder = CatalogItem.newBuilder();
+      JsonFormat.parser().merge((new 
JSONObject(context.element())).toString(), catalogItemBuilder);
+      CatalogItem catalogItem = catalogItemBuilder.build();
+
+      try (CatalogServiceClient catalogServiceClient = 
CatalogServiceClient.create()) {
+        CatalogItem response = catalogServiceClient.createCatalogItem(parent, 
catalogItem);
+
+        context.output(SUCCESS_TAG, response);
+      } catch (ApiException e) {
+        context.output(FAILURE_TAG, catalogItem);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
new file mode 100644
index 0000000..48ba7d6
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIIO.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+/**
+ * The RecommendationAIIO class acts as a wrapper around the {@link 
PTransform}s that interact with
+ * the Recommendation AI API (https://cloud.google.com/recommendations).
+ *
+ * <p>More information can be found on: - Writing catalog items using {@link
+ * RecommendationAICreateCatalogItem} - Importing catalog items using {@link
+ * RecommendationAIImportCatalogItems} - Writing user events using {@link
+ * RecommendationAIWriteUserEvent} - Importing user events using {@link
+ * RecommendationAIImportUserEvents} - Making predictions using {@link 
RecommendationAIPredict}
+ */
+public class RecommendationAIIO {
+
+  public static RecommendationAICreateCatalogItem createCatalogItems() {
+    return RecommendationAICreateCatalogItem.newBuilder().build();
+  }
+
+  public static RecommendationAIImportCatalogItems importCatalogItems() {
+    return RecommendationAIImportCatalogItems.newBuilder().build();
+  }
+
+  public static RecommendationAIWriteUserEvent writeUserEvent() {
+    return RecommendationAIWriteUserEvent.newBuilder().build();
+  }
+
+  public static RecommendationAIImportUserEvents importUserEvents() {
+    return RecommendationAIImportUserEvents.newBuilder().build();
+  }
+
+  public static RecommendationAIPredict predictAll() {
+    return RecommendationAIPredict.newBuilder().build();
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
new file mode 100644
index 0000000..1bea407
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import 
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link 
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max: 
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link 
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a 
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+    extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> 
{
+
+  public static final TupleTag<CatalogItem> SUCCESS_TAG = new 
TupleTag<CatalogItem>() {};
+  public static final TupleTag<CatalogItem> FAILURE_TAG = new 
TupleTag<CatalogItem>() {};
+
+  static Builder newBuilder() {
+    return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+  }
+
+  abstract Builder toBuilder();
+
+  /** @return ID of Google Cloud project to be used for creating catalog 
items. */
+  public abstract @Nullable String projectId();
+
+  /** @return Name of the catalog where the catalog items will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Size of input elements batch to be sent in one request. */
+  public abstract Integer batchSize();
+
+  /**
+   * @return Time limit (in processing time) on how long an incomplete batch 
of elements is allowed
+   *     to be buffered.
+   */
+  public abstract Duration maxBufferingDuration();
+
+  public RecommendationAIImportCatalogItems withProjectId(String projectId) {
+    return this.toBuilder().setProjectId(projectId).build();
+  }
+
+  public RecommendationAIImportCatalogItems withCatalogName(String 
catalogName) {
+    return this.toBuilder().setCatalogName(catalogName).build();
+  }
+
+  public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) {
+    return this.toBuilder().setBatchSize(batchSize).build();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
CatalogItem}s and then
+   * calls the Recommendation AI service to create the catalog item.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+    return input
+        .apply(
+            "Batch Contents",
+            GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+                .withMaxBufferingDuration(maxBufferingDuration())
+                .withShardedKey())
+        .apply(
+            "Import CatalogItems",
+            ParDo.of(new ImportCatalogItems(projectId(), catalogName()))
+                .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    /** @param projectId ID of Google Cloud project to be used for creating 
catalog items. */
+    public abstract Builder setProjectId(@Nullable String projectId);
+
+    /** @param catalogName Name of the catalog where the catalog items will be 
created. */
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    /**
+     * @param batchSize Amount of input elements to be sent to Recommendation 
AI service in one
+     *     request.
+     */
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    /**
+     * @param maxBufferingDuration Time limit (in processing time) on how long 
an incomplete batch
+     *     of elements is allowed to be buffered.
+     */
+    public abstract Builder setMaxBufferingDuration(Duration 
maxBufferingDuration);
+
+    public abstract RecommendationAIImportCatalogItems build();
+  }
+
+  private static class ImportCatalogItems
+      extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, CatalogItem> 
{
+    private final String projectId;
+    private final String catalogName;
+
+    /**
+     * @param projectId ID of GCP project to be used for creating catalog 
items.
+     * @param catalogName Catalog name for CatalogItem creation.
+     */
+    private ImportCatalogItems(String projectId, String catalogName) {
+      this.projectId = projectId;
+      this.catalogName = catalogName;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c)
+        throws IOException, ExecutionException, InterruptedException {
+      CatalogName parent = CatalogName.of(projectId, "global", catalogName);
+
+      ArrayList<CatalogItem> catalogItems = new ArrayList<>();
+      for (GenericJson element : c.element().getValue()) {
+        CatalogItem.Builder catalogItemBuilder = CatalogItem.newBuilder();
+        JsonFormat.parser().merge((new JSONObject(element)).toString(), 
catalogItemBuilder);
+        catalogItems.add(catalogItemBuilder.build());
+      }
+      CatalogInlineSource catalogInlineSource =
+          
CatalogInlineSource.newBuilder().addAllCatalogItems(catalogItems).build();
+
+      InputConfig inputConfig =
+          
InputConfig.newBuilder().mergeCatalogInlineSource(catalogInlineSource).build();
+      ImportCatalogItemsRequest request =
+          ImportCatalogItemsRequest.newBuilder()
+              .setParent(parent.toString())
+              .setInputConfig(inputConfig)
+              .build();
+      try (CatalogServiceClient catalogServiceClient = 
CatalogServiceClient.create()) {
+        ImportCatalogItemsResponse response =
+            catalogServiceClient.importCatalogItemsAsync(request).get();
+        if (response.getErrorSamplesCount() > 0) {
+          for (CatalogItem ci : catalogItems) {
+            c.output(FAILURE_TAG, ci);
+          }
+        } else {
+          for (CatalogItem ci : catalogItems) {
+            c.output(SUCCESS_TAG, ci);
+          }
+        }
+      } catch (ApiException e) {
+        for (CatalogItem ci : catalogItems) {
+          c.output(SUCCESS_TAG, ci);
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
new file mode 100644
index 0000000..762c379
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.util.ShardedKey;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. 
*
+ *
+ * <p>Batch size defines how many items are at once per batch (max: 5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link 
GenericJson}s (assumed to be
+ * the user event id as key and contents as value) and outputs a 
PCollectionTuple which will contain
+ * the successfully created and failed user events.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
catalog item (defaults
+ * to "default_catalog"). It is possible to provide a event store to which you 
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportUserEvents
+    extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> 
{
+
+  public static final TupleTag<UserEvent> SUCCESS_TAG = new 
TupleTag<UserEvent>() {};
+  public static final TupleTag<UserEvent> FAILURE_TAG = new 
TupleTag<UserEvent>() {};
+
+  static Builder newBuilder() {
+    return new AutoValue_RecommendationAIImportUserEvents.Builder()
+        .setCatalogName("default_catalog")
+        .setEventStore("default_event_store");
+  }
+
+  abstract Builder toBuilder();
+
+  /** @return ID of Google Cloud project to be used for creating user events. 
*/
+  public abstract @Nullable String projectId();
+
+  /** @return Name of the catalog where the user events will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Name of the event store where the user events will be created. */
+  public abstract @Nullable String eventStore();
+
+  /** @return Size of input elements batch to be sent in one request. */
+  public abstract Integer batchSize();
+
+  /**
+   * @return Time limit (in processing time) on how long an incomplete batch 
of elements is allowed
+   *     to be buffered.
+   */
+  public abstract Duration maxBufferingDuration();
+
+  public RecommendationAIImportUserEvents withProjectId(String projectId) {
+    return this.toBuilder().setProjectId(projectId).build();
+  }
+
+  public RecommendationAIImportUserEvents withCatalogName(String catalogName) {
+    return this.toBuilder().setCatalogName(catalogName).build();
+  }
+
+  public RecommendationAIImportUserEvents withEventStore(String eventStore) {
+    return this.toBuilder().setEventStore(eventStore).build();
+  }
+
+  public RecommendationAIImportUserEvents withBatchSize(Integer batchSize) {
+    return this.toBuilder().setBatchSize(batchSize).build();
+  }
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
UserEvent}s and then calls
+   * the Recommendation AI service to create the user event.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+    return input
+        .apply(
+            "Batch Contents",
+            GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+                .withMaxBufferingDuration(maxBufferingDuration())
+                .withShardedKey())
+        .apply(
+            "Import CatalogItems",
+            ParDo.of(new ImportUserEvents(projectId(), catalogName(), 
eventStore()))
+                .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    /** @param projectId ID of Google Cloud project to be used for creating 
user events. */
+    public abstract Builder setProjectId(@Nullable String projectId);
+
+    /** @param catalogName Name of the catalog where the user events will be 
created. */
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    /** @param eventStore Name of the event store where the user events will 
be created. */
+    public abstract Builder setEventStore(@Nullable String eventStore);
+
+    /**
+     * @param batchSize Amount of input elements to be sent to Recommendation 
AI service in one
+     *     request.
+     */
+    public abstract Builder setBatchSize(Integer batchSize);
+
+    /**
+     * @param maxBufferingDuration Time limit (in processing time) on how long 
an incomplete batch
+     *     of elements is allowed to be buffered.
+     */
+    public abstract Builder setMaxBufferingDuration(Duration 
maxBufferingDuration);
+
+    public abstract RecommendationAIImportUserEvents build();
+  }
+
+  private static class ImportUserEvents
+      extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, UserEvent> {
+    private final String projectId;
+    private final String catalogName;
+    private final String eventStore;
+
+    /**
+     * @param projectId ID of GCP project to be used for creating user events.
+     * @param catalogName Catalog name for UserEvent creation.
+     * @param eventStore Event store name for UserEvent creation.
+     */
+    private ImportUserEvents(String projectId, String catalogName, String 
eventStore) {
+      this.projectId = projectId;
+      this.catalogName = catalogName;
+      this.eventStore = eventStore;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext c)
+        throws IOException, ExecutionException, InterruptedException {
+      EventStoreName parent = EventStoreName.of(projectId, "global", 
catalogName, eventStore);
+
+      ArrayList<UserEvent> userEvents = new ArrayList<>();
+      for (GenericJson element : c.element().getValue()) {
+        UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+        JsonFormat.parser().merge((new JSONObject(element)).toString(), 
userEventBuilder);
+        userEvents.add(userEventBuilder.build());
+      }
+      UserEventInlineSource userEventInlineSource =
+          
UserEventInlineSource.newBuilder().addAllUserEvents(userEvents).build();
+
+      InputConfig inputConfig =
+          
InputConfig.newBuilder().mergeUserEventInlineSource(userEventInlineSource).build();
+      ImportUserEventsRequest request =
+          ImportUserEventsRequest.newBuilder()
+              .setParent(parent.toString())
+              .setInputConfig(inputConfig)
+              .build();
+      try (UserEventServiceClient userEventServiceClient = 
UserEventServiceClient.create()) {
+        ImportUserEventsResponse response =
+            userEventServiceClient.importUserEventsAsync(request).get();
+        if (response.getErrorSamplesCount() > 0) {
+          for (UserEvent ci : userEvents) {
+            c.output(FAILURE_TAG, ci);
+          }
+        } else {
+          for (UserEvent ci : userEvents) {
+            c.output(SUCCESS_TAG, ci);
+          }
+        }
+      } catch (ApiException e) {
+        for (UserEvent ci : userEvents) {
+          c.output(FAILURE_TAG, ci);
+        }
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
new file mode 100644
index 0000000..64aea84
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.PlacementName;
+import com.google.cloud.recommendationengine.v1beta1.PredictResponse;
+import com.google.cloud.recommendationengine.v1beta1.PredictionServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API 
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and creates 
{@link
+ * PredictResponse.PredictionResult}s.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
user event (defaults to
+ * "default_catalog"). It is possible to provide a event store to which you 
want to add the user
+ * event (defaults to "default_event_store"). A placement id for the 
recommendation engine placement
+ * to be used.
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIPredict
+    extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+  /** @return ID of Google Cloud project to be used for creating catalog 
items. */
+  public abstract @Nullable String projectId();
+
+  /** @return Name of the catalog where the catalog items will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Name of the event store where the user events will be created. */
+  public abstract @Nullable String eventStore();
+
+  /** @return ID of the recommendation engine placement. */
+  public abstract String placementId();
+
+  public static final TupleTag<PredictResponse.PredictionResult> SUCCESS_TAG =
+      new TupleTag<PredictResponse.PredictionResult>() {};
+
+  public static final TupleTag<UserEvent> FAILURE_TAG = new 
TupleTag<UserEvent>() {};
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    /** @param projectId ID of Google Cloud project to be used for the 
predictions. */
+    public abstract Builder setProjectId(@Nullable String projectId);
+
+    /** @param catalogName Name of the catalog to be used for predictions. */
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    /** @param eventStore Name of the event store to be used for predictions. 
*/
+    public abstract Builder setEventStore(@Nullable String eventStore);
+
+    /** @param placementId of the recommendation engine placement. */
+    public abstract Builder setPlacementId(String placementId);
+
+    public abstract RecommendationAIPredict build();
+  }
+
+  static Builder newBuilder() {
+    return new AutoValue_RecommendationAIPredict.Builder()
+        .setCatalogName("default_catalog")
+        .setEventStore("default_event_store")
+        .setPlacementId("recently_viewed_default");
+  }
+
+  abstract Builder toBuilder();
+
+  public RecommendationAIPredict withProjectId(String projectId) {
+    return this.toBuilder().setProjectId(projectId).build();
+  }
+
+  public RecommendationAIPredict withCatalogName(String catalogName) {
+    return this.toBuilder().setCatalogName(catalogName).build();
+  }
+
+  public RecommendationAIPredict withEventStore(String eventStore) {
+    return this.toBuilder().setEventStore(eventStore).build();
+  }
+
+  public RecommendationAIPredict withPlacementId(String placementId) {
+    return this.toBuilder().setPlacementId(placementId).build();
+  }
+
+  @Override
+  public PCollectionTuple expand(PCollection<GenericJson> input) {
+    return input.apply(
+        ParDo.of(new Predict(projectId(), catalogName(), eventStore(), 
placementId()))
+            .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+  }
+
+  private static class Predict extends DoFn<GenericJson, 
PredictResponse.PredictionResult> {
+    private final String projectId;
+    private final String catalogName;
+    private final String eventStore;
+    private final String placementId;
+
+    /**
+     * @param projectId ID of GCP project to be used for creating catalog 
items.
+     * @param catalogName Catalog name for UserEvent creation.
+     * @param eventStore Event store for UserEvent creation.
+     * @param placementId ID of the recommendation engine placement.
+     */
+    private Predict(String projectId, String catalogName, String eventStore, 
String placementId) {
+      this.projectId = projectId;
+      this.catalogName = catalogName;
+      this.eventStore = eventStore;
+      this.placementId = placementId;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context) throws IOException {
+      PlacementName name =
+          PlacementName.of(projectId, "global", catalogName, eventStore, 
placementId);
+      UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+      JsonFormat.parser().merge((new 
JSONObject(context.element())).toString(), userEventBuilder);
+      UserEvent userEvent = userEventBuilder.build();
+      try (PredictionServiceClient predictionServiceClient = 
PredictionServiceClient.create()) {
+        for (PredictResponse.PredictionResult res :
+            predictionServiceClient.predict(name, userEvent).iterateAll()) {
+          context.output(SUCCESS_TAG, res);
+        }
+      } catch (ApiException e) {
+        context.output(FAILURE_TAG, userEvent);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
new file mode 100644
index 0000000..ba1f2d8
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.concurrent.ExecutionException;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} using the Recommendations AI API 
(https://cloud.google.com/recommendations).
+ * Takes an input {@link PCollection} of {@link GenericJson}s and converts 
them to and creates
+ * {@link UserEvent}s.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
user event (defaults to
+ * "default_catalog"). It is possible to provide a event store to which you 
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIWriteUserEvent
+    extends PTransform<PCollection<GenericJson>, PCollectionTuple> {
+
+  public static final TupleTag<UserEvent> SUCCESS_TAG = new 
TupleTag<UserEvent>() {};
+  public static final TupleTag<UserEvent> FAILURE_TAG = new 
TupleTag<UserEvent>() {};
+
+  static Builder newBuilder() {
+    return new AutoValue_RecommendationAIWriteUserEvent.Builder()
+        .setCatalogName("default_catalog")
+        .setEventStore("default_event_store");
+  }
+
+  /** @return ID of Google Cloud project to be used for creating user events. 
*/
+  public abstract @Nullable String projectId();
+
+  /** @return Name of the catalog where the user events will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Name of the event store where the user events will be created. */
+  public abstract @Nullable String eventStore();
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
UserEvent}s and then calls
+   * the Recommendation AI service to create the user event.
+   *
+   * @param input input PCollection
+   * @return PCollectionTuple with successful and failed {@link UserEvent}s
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<GenericJson> input) {
+    return input.apply(
+        ParDo.of(new WriteUserEvent(projectId(), catalogName(), eventStore()))
+            .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG)));
+  }
+
+  @AutoValue.Builder
+  abstract static class Builder {
+    /** @param projectId ID of Google Cloud project to be used for creating 
user events. */
+    public abstract Builder setProjectId(@Nullable String projectId);
+
+    /** @param catalogName Name of the catalog where the user events will be 
created. */
+    public abstract Builder setCatalogName(@Nullable String catalogName);
+
+    /** @param eventStore Name of the event store where the user events will 
be created. */
+    public abstract Builder setEventStore(@Nullable String eventStore);
+
+    public abstract RecommendationAIWriteUserEvent build();
+  }
+
+  abstract Builder toBuilder();
+
+  public RecommendationAIWriteUserEvent withProjectId(String projectId) {
+    return this.toBuilder().setProjectId(projectId).build();
+  }
+
+  public RecommendationAIWriteUserEvent withCatalogName(String catalogName) {
+    return this.toBuilder().setCatalogName(catalogName).build();
+  }
+
+  public RecommendationAIWriteUserEvent withEventStore(String eventStore) {
+    return this.toBuilder().setEventStore(eventStore).build();
+  }
+
+  private static class WriteUserEvent extends DoFn<GenericJson, UserEvent> {
+    private final String projectId;
+    private final String catalogName;
+    private final String eventStore;
+
+    /**
+     * @param projectId ID of GCP project to be used for creating user events.
+     * @param catalogName Catalog name for UserEvent creation.
+     * @param eventStore Event store for UserEvent creation.
+     */
+    private WriteUserEvent(String projectId, String catalogName, String 
eventStore) {
+      this.projectId = projectId;
+      this.catalogName = catalogName;
+      this.eventStore = eventStore;
+    }
+
+    @ProcessElement
+    public void processElement(ProcessContext context)
+        throws IOException, ExecutionException, InterruptedException {
+      EventStoreName parent = EventStoreName.of(projectId, "global", 
catalogName, eventStore);
+      UserEvent.Builder userEventBuilder = UserEvent.newBuilder();
+      JsonFormat.parser().merge((new 
JSONObject(context.element())).toString(), userEventBuilder);
+      UserEvent userEvent = userEventBuilder.build();
+
+      try (UserEventServiceClient userEventServiceClient = 
UserEventServiceClient.create()) {
+        UserEvent response = userEventServiceClient.writeUserEvent(parent, 
userEvent);
+
+        context.output(SUCCESS_TAG, response);
+      } catch (ApiException e) {
+        context.output(FAILURE_TAG, userEvent);
+      }
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
new file mode 100644
index 0000000..2efdc04
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/DelegatingAtomicCoder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+
+public abstract class DelegatingAtomicCoder<X, W> extends AtomicCoder<X> {
+
+  private final Coder<W> delegate;
+
+  protected DelegatingAtomicCoder(Coder<W> delegate) {
+    this.delegate = delegate;
+  }
+
+  @Override
+  public final X decode(InputStream inStream) throws CoderException, 
IOException {
+    return from(delegate.decode(inStream));
+  }
+
+  @Override
+  public final void encode(X value, OutputStream outStream) throws 
CoderException, IOException {
+    delegate.encode(to(value), outStream);
+  }
+
+  protected abstract X from(W object) throws CoderException, IOException;
+
+  @Override
+  public void verifyDeterministic() throws NonDeterministicException {
+    delegate.verifyDeterministic();
+  }
+
+  protected abstract W to(X object) throws CoderException, IOException;
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
new file mode 100644
index 0000000..a37f6b6
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/GenericJsonCoder.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.api.client.googleapis.util.Utils;
+import com.google.api.client.json.GenericJson;
+import com.google.api.client.json.JsonFactory;
+import java.io.IOException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+
+/**
+ * Can be used as a coder for any object that extends GenericJson. This 
includes all objects in the
+ * Google Genomics Java client library.
+ */
+public class GenericJsonCoder<T extends GenericJson> extends 
DelegatingAtomicCoder<T, String> {
+
+  private static final JsonFactory JSON_FACTORY = 
Utils.getDefaultJsonFactory();
+  private static final Coder<String> STRING_CODER = StringUtf8Coder.of();
+  private final Class<T> type;
+
+  private GenericJsonCoder(Class<T> type) {
+    super(STRING_CODER);
+    this.type = type;
+  }
+
+  public static <T extends GenericJson> GenericJsonCoder<T> of(Class<T> type) {
+    return new GenericJsonCoder<>(type);
+  }
+
+  @JsonCreator
+  @SuppressWarnings("unchecked")
+  public static <T extends GenericJson> GenericJsonCoder<T> 
of(@JsonProperty("type") String type)
+      throws ClassNotFoundException {
+    return of((Class<T>) Class.forName(type));
+  }
+
+  @Override
+  protected T from(String object) throws IOException {
+    return JSON_FACTORY.fromString(object, type);
+  }
+
+  @Override
+  protected String to(T object) throws IOException {
+    return JSON_FACTORY.toString(object);
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
new file mode 100644
index 0000000..aee981b
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAICatalogItemIT.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAICatalogItemIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  private static GenericJson getCatalogItem() {
+    List<Object> categories = new ArrayList<Object>();
+    categories.add(new GenericJson().set("categories", 
Arrays.asList("Electronics", "Computers")));
+    categories.add(new GenericJson().set("categories", 
Arrays.asList("Laptops")));
+    return new GenericJson()
+        .set("id", Integer.toString(new Random().nextInt()))
+        .set("title", "Sample Laptop")
+        .set("description", "Indisputably the most fantastic laptop ever 
created.")
+        .set("categoryHierarchies", categories)
+        .set("languageCode", "en");
+  }
+
+  @Test
+  public void createCatalogItem() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+    GenericJson catalogItem = getCatalogItem();
+
+    PCollectionTuple createCatalogItemResult =
+        testPipeline
+            .apply(
+                Create.of(Arrays.asList(catalogItem))
+                    .withCoder(GenericJsonCoder.of(GenericJson.class)))
+            
.apply(RecommendationAIIO.createCatalogItems().withProjectId(projectId));
+    
PAssert.that(createCatalogItemResult.get(RecommendationAICreateCatalogItem.SUCCESS_TAG))
+        .satisfies(new VerifyCatalogItemResult(1, (String) 
catalogItem.get("id")));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Ignore("Import method causing issues")
+  @Test
+  public void importCatalogItems() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+    ArrayList<KV<String, GenericJson>> catalogItems = new ArrayList<>();
+
+    GenericJson catalogItem1 = getCatalogItem();
+    GenericJson catalogItem2 = getCatalogItem();
+
+    catalogItems.add(KV.of(Integer.toString(new Random().nextInt()), 
catalogItem1));
+    catalogItems.add(KV.of(Integer.toString(new Random().nextInt()), 
catalogItem2));
+
+    PCollectionTuple importCatalogItemResult =
+        testPipeline
+            .apply(Create.of(catalogItems))
+            
.apply(RecommendationAIImportCatalogItems.newBuilder().setProjectId(projectId).build());
+    
PAssert.that(importCatalogItemResult.get(RecommendationAIImportCatalogItems.SUCCESS_TAG))
+        .satisfies(new VerifyCatalogItemResult(2, (String) 
catalogItem1.get("id")));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private static class VerifyCatalogItemResult
+      implements SerializableFunction<Iterable<CatalogItem>, Void> {
+
+    String catalogItemId;
+    int size;
+
+    private VerifyCatalogItemResult(int size, String catalogItemId) {
+      this.size = size;
+      this.catalogItemId = catalogItemId;
+    }
+
+    @Override
+    public Void apply(Iterable<CatalogItem> input) {
+      List<String> matches = new ArrayList<>();
+      input.forEach(
+          item -> {
+            CatalogItem result = item;
+            matches.add(result.getId());
+          });
+      assertTrue(matches.contains(this.catalogItemId));
+      assertEquals(size, matches.size());
+      return null;
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
new file mode 100644
index 0000000..a39b0b1
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredictIT.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.PredictResponse;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAIPredictIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  public static GenericJson getUserEvent() {
+    GenericJson userInfo = new GenericJson().set("visitorId", "1");
+    return new GenericJson().set("eventType", 
"home-page-view").set("userInfo", userInfo);
+  }
+
+  @Test
+  public void predict() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+    PCollectionTuple predictResult =
+        testPipeline
+            .apply(
+                Create.of(Arrays.asList(getUserEvent()))
+                    .withCoder(GenericJsonCoder.of(GenericJson.class)))
+            .apply(
+                RecommendationAIIO.predictAll()
+                    .withProjectId(projectId)
+                    .withPlacementId("recently_viewed_default"));
+    PAssert.that(predictResult.get(RecommendationAIPredict.SUCCESS_TAG))
+        .satisfies(new VerifyPredictResult());
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private static class VerifyPredictResult
+      implements 
SerializableFunction<Iterable<PredictResponse.PredictionResult>, Void> {
+
+    @Override
+    public Void apply(Iterable<PredictResponse.PredictionResult> input) {
+      List<PredictResponse.PredictionResult> matches = new ArrayList<>();
+      input.forEach(
+          item -> {
+            PredictResponse.PredictionResult result = item;
+            matches.add(result);
+          });
+      assertTrue(!matches.isEmpty());
+      return null;
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
new file mode 100644
index 0000000..3be60a8
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/RecommendationAIUserEventIT.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import com.google.api.client.json.GenericJson;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.junit.Ignore;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class RecommendationAIUserEventIT {
+  @Rule public TestPipeline testPipeline = TestPipeline.create();
+
+  public static GenericJson getUserEvent() {
+    GenericJson userInfo = new GenericJson().set("visitorId", "1");
+    GenericJson productDetail = new GenericJson().set("id", 
"1").set("quantity", 1);
+    ArrayList<GenericJson> productDetails = new ArrayList<>();
+    productDetails.add(productDetail);
+    GenericJson productEventDetail = new GenericJson().set("productDetails", 
productDetails);
+    return new GenericJson()
+        .set("eventType", "detail-page-view")
+        .set("userInfo", userInfo)
+        .set("productEventDetail", productEventDetail);
+  }
+
+  @Test
+  public void createUserEvent() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+
+    PCollectionTuple createUserEventResult =
+        testPipeline
+            .apply(
+                Create.of(Arrays.asList(getUserEvent()))
+                    .withCoder(GenericJsonCoder.of(GenericJson.class)))
+            
.apply(RecommendationAIIO.writeUserEvent().withProjectId(projectId));
+    
PAssert.that(createUserEventResult.get(RecommendationAIWriteUserEvent.SUCCESS_TAG))
+        .satisfies(new VerifyUserEventResult(1));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  @Ignore("Import method causing issues")
+  @Test
+  public void importUserEvents() {
+    String projectId = 
testPipeline.getOptions().as(GcpOptions.class).getProject();
+    ArrayList<KV<String, GenericJson>> userEvents = new ArrayList<>();
+    userEvents.add(KV.of("123", getUserEvent()));
+    userEvents.add(KV.of("123", getUserEvent()));
+
+    PCollectionTuple importUserEventResult =
+        testPipeline
+            .apply(Create.of(userEvents))
+            
.apply(RecommendationAIImportUserEvents.newBuilder().setProjectId(projectId).build());
+    
PAssert.that(importUserEventResult.get(RecommendationAIWriteUserEvent.SUCCESS_TAG))
+        .satisfies(new VerifyUserEventResult(2));
+    testPipeline.run().waitUntilFinish();
+  }
+
+  private static class VerifyUserEventResult
+      implements SerializableFunction<Iterable<UserEvent>, Void> {
+
+    int size;
+
+    private VerifyUserEventResult(int size) {
+      this.size = size;
+    }
+
+    @Override
+    public Void apply(Iterable<UserEvent> input) {
+      List<String> matches = new ArrayList<>();
+      input.forEach(
+          item -> {
+            UserEvent result = item;
+            matches.add(result.getUserInfo().getVisitorId());
+          });
+      assertTrue(matches.contains(((GenericJson) 
getUserEvent().get("userInfo")).get("visitorId")));
+      assertEquals(size, matches.size());
+      return null;
+    }
+  }
+}

Reply via email to