pabloem commented on a change in pull request #13645:
URL: https://github.com/apache/beam/pull/13645#discussion_r619336944
##########
File path: sdks/java/extensions/ml/build.gradle
##########
@@ -47,6 +53,12 @@ dependencies {
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-vision:1.99.3'
testCompile library.java.mockito_core
+ testCompile group: 'com.google.http-client', name: 'google-http-client',
version: '1.38.0'
Review comment:
please use `library.java.google_http_client`
##########
File path: sdks/java/extensions/ml/build.gradle
##########
@@ -32,13 +32,19 @@ dependencies {
compile project(path: ":sdks:java:core", configuration: "shadow")
compile project(":sdks:java:expansion-service")
permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761
+ compile group: 'com.google.http-client', name: 'google-http-client',
version: '1.38.0'
+ compile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7'
compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
compile 'com.google.cloud:google-cloud-dlp:1.1.4'
compile 'com.google.cloud:google-cloud-language:1.99.4'
+ compile group: 'com.google.protobuf', name: 'protobuf-java-util', version:
'3.14.0'
+ compile group: 'org.json', name: 'json', version: '20201115'
compile 'com.google.api.grpc:proto-google-cloud-dlp-v2:1.1.4'
compile 'com.google.api.grpc:proto-google-cloud-language-v1:1.81.4'
compile
'com.google.api.grpc:proto-google-cloud-video-intelligence-v1:1.2.0'
compile 'com.google.api.grpc:proto-google-cloud-vision-v1:1.81.3'
+ compile
'com.google.api.grpc:proto-google-cloud-recommendations-ai-v1beta1:0.3.7'
+ compile "joda-time:joda-time:2.10.10"
Review comment:
also `library.java.joda_time`
##########
File path: sdks/java/extensions/ml/build.gradle
##########
@@ -47,6 +53,12 @@ dependencies {
testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
compile 'com.google.cloud:google-cloud-vision:1.99.3'
testCompile library.java.mockito_core
+ testCompile group: 'com.google.http-client', name: 'google-http-client',
version: '1.38.0'
+ testCompile group: 'com.google.protobuf', name: 'protobuf-java-util',
version: '3.14.0'
Review comment:
also `library.java.protobuf_java_util`
you can find these at
https://github.com/apache/beam/blob/master/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy
##########
File path:
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max:
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<CatalogItem> SUCCESS_TAG = new
TupleTag<CatalogItem>() {};
+ public static final TupleTag<CatalogItem> FAILURE_TAG = new
TupleTag<CatalogItem>() {};
+
+ public static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the catalog items will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportCatalogItems withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportCatalogItems withCatalogName(String
catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
CatalogItem}s and then
+ * calls the Recommendation AI service to create the catalog item.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+ return input
+ .apply(
+ "Batch Contents",
+ GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+ .withMaxBufferingDuration(maxBufferingDuration()))
Review comment:
add a TODO: Consider adding autosharding for GroupIntoBatches
##########
File path:
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
##########
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s.
*
+ *
+ * <p>Batch size defines how many items are at once per batch (max: 5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the user event id as key and contents as value) and outputs a
PCollectionTuple which will contain
+ * the successfully created and failed user events.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog"). It is possible to provide a event store to which you
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportUserEvents
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<UserEvent> SUCCESS_TAG = new
TupleTag<UserEvent>() {};
+ public static final TupleTag<UserEvent> FAILURE_TAG = new
TupleTag<UserEvent>() {};
+
+ public static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportUserEvents.Builder()
+ .setCatalogName("default_catalog")
+ .setEventStore("default_event_store");
+ }
+
+ public abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating user events.
*/
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the user events will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Name of the event store where the user events will be created. */
+ public abstract @Nullable String eventStore();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportUserEvents withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportUserEvents withCatalogName(String catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportUserEvents withEventStore(String eventStore) {
+ return this.toBuilder().setEventStore(eventStore).build();
+ }
+
+ public RecommendationAIImportUserEvents withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
UserEvent}s and then calls
+ * the Recommendation AI service to create the user event.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
Review comment:
consider
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java#L37
##########
File path:
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max:
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<CatalogItem> SUCCESS_TAG = new
TupleTag<CatalogItem>() {};
+ public static final TupleTag<CatalogItem> FAILURE_TAG = new
TupleTag<CatalogItem>() {};
+
+ public static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the catalog items will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportCatalogItems withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportCatalogItems withCatalogName(String
catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
CatalogItem}s and then
+ * calls the Recommendation AI service to create the catalog item.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+ return input
+ .apply(
+ "Batch Contents",
+ GroupIntoBatches.<String, GenericJson>ofSize(batchSize())
+ .withMaxBufferingDuration(maxBufferingDuration()))
Review comment:
withShardedKey
##########
File path:
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
##########
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.GroupIntoBatches;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.joda.time.Duration;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max:
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+ extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple>
{
+
+ public static final TupleTag<CatalogItem> SUCCESS_TAG = new
TupleTag<CatalogItem>() {};
+ public static final TupleTag<CatalogItem> FAILURE_TAG = new
TupleTag<CatalogItem>() {};
+
+ public static Builder newBuilder() {
+ return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+ }
+
+ public abstract Builder toBuilder();
+
+ /** @return ID of Google Cloud project to be used for creating catalog
items. */
+ public abstract @Nullable String projectId();
+
+ /** @return Name of the catalog where the catalog items will be created. */
+ public abstract @Nullable String catalogName();
+
+ /** @return Size of input elements batch to be sent in one request. */
+ public abstract Integer batchSize();
+
+ /**
+ * @return Time limit (in processing time) on how long an incomplete batch
of elements is allowed
+ * to be buffered.
+ */
+ public abstract Duration maxBufferingDuration();
+
+ public RecommendationAIImportCatalogItems withProjectId(String projectId) {
+ return this.toBuilder().setProjectId(projectId).build();
+ }
+
+ public RecommendationAIImportCatalogItems withCatalogName(String
catalogName) {
+ return this.toBuilder().setCatalogName(catalogName).build();
+ }
+
+ public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) {
+ return this.toBuilder().setBatchSize(batchSize).build();
+ }
+
+ /**
+ * The transform converts the contents of input PCollection into {@link
CatalogItem}s and then
+ * calls the Recommendation AI service to create the catalog item.
+ *
+ * @param input input PCollection
+ * @return PCollection after transformations
+ */
+ @Override
+ public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
Review comment:
consider returning a ImportResult instead of PCollectionTuple (see like
this for BQ.Write:
https://github.com/apache/beam/blob/master/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/WriteResult.java#L37)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]