pabloem commented on a change in pull request #13645: URL: https://github.com/apache/beam/pull/13645#discussion_r623498839
########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource; +import com.google.cloud.recommendationengine.v1beta1.CatalogItem; +import com.google.cloud.recommendationengine.v1beta1.CatalogName; +import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link CatalogItem}s. * + * + * <p>Batch size defines how many items are created at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the catalog item id as key and contents as value) and outputs a PCollectionTuple which will + * contain the successfully created and failed catalog items. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportCatalogItems + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<CatalogItem> SUCCESS_TAG = new TupleTag<CatalogItem>() {}; + public static final TupleTag<CatalogItem> FAILURE_TAG = new TupleTag<CatalogItem>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportCatalogItems.Builder(); + } Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. * + * + * <p>Batch size defines how many items are at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the user event id as key and contents as value) and outputs a PCollectionTuple which will contain + * the successfully created and failed user events. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportUserEvents + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. * + * + * <p>Batch size defines how many items are at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the user event id as key and contents as value) and outputs a PCollectionTuple which will contain + * the successfully created and failed user events. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportUserEvents + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportUserEvents.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + public abstract Builder toBuilder(); + + /** @return ID of Google Cloud project to be used for creating user events. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the user events will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** @return Size of input elements batch to be sent in one request. */ + public abstract Integer batchSize(); + + /** + * @return Time limit (in processing time) on how long an incomplete batch of elements is allowed + * to be buffered. + */ + public abstract Duration maxBufferingDuration(); + + public RecommendationAIImportUserEvents withProjectId(String projectId) { + return this.toBuilder().setProjectId(projectId).build(); + } + + public RecommendationAIImportUserEvents withCatalogName(String catalogName) { + return this.toBuilder().setCatalogName(catalogName).build(); + } + + public RecommendationAIImportUserEvents withEventStore(String eventStore) { + return this.toBuilder().setEventStore(eventStore).build(); + } + + public RecommendationAIImportUserEvents withBatchSize(Integer batchSize) { + return this.toBuilder().setBatchSize(batchSize).build(); + } + + /** + * The transform converts the contents of input PCollection into {@link UserEvent}s and then calls + * the Recommendation AI service to create the user event. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) { + return input + .apply( + "Batch Contents", + GroupIntoBatches.<String, GenericJson>ofSize(batchSize()) + .withMaxBufferingDuration(maxBufferingDuration()) + .withShardedKey()) + .apply( + "Import CatalogItems", + ParDo.of(new ImportUserEvents(projectId(), catalogName(), eventStore())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. * + * + * <p>Batch size defines how many items are at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the user event id as key and contents as value) and outputs a PCollectionTuple which will contain + * the successfully created and failed user events. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportUserEvents + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportUserEvents.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + public abstract Builder toBuilder(); Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/build.gradle ########## @@ -32,13 +32,19 @@ dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + compile group: 'com.google.http-client', name: 'google-http-client', version: '1.38.0' Review comment: depend on http client from https://github.com/apache/beam/blob/master/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L545 instead, please (`library.java.google_http_client`) ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java ########## @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. * + * + * <p>Batch size defines how many items are at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the user event id as key and contents as value) and outputs a PCollectionTuple which will contain + * the successfully created and failed user events. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportUserEvents + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportUserEvents.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + public abstract Builder toBuilder(); + + /** @return ID of Google Cloud project to be used for creating user events. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the user events will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** @return Size of input elements batch to be sent in one request. */ + public abstract Integer batchSize(); + + /** + * @return Time limit (in processing time) on how long an incomplete batch of elements is allowed + * to be buffered. + */ + public abstract Duration maxBufferingDuration(); + + public RecommendationAIImportUserEvents withProjectId(String projectId) { + return this.toBuilder().setProjectId(projectId).build(); + } + + public RecommendationAIImportUserEvents withCatalogName(String catalogName) { + return this.toBuilder().setCatalogName(catalogName).build(); + } + + public RecommendationAIImportUserEvents withEventStore(String eventStore) { + return this.toBuilder().setEventStore(eventStore).build(); + } + + public RecommendationAIImportUserEvents withBatchSize(Integer batchSize) { + return this.toBuilder().setBatchSize(batchSize).build(); + } + + /** + * The transform converts the contents of input PCollection into {@link UserEvent}s and then calls + * the Recommendation AI service to create the user event. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) { + return input + .apply( + "Batch Contents", + GroupIntoBatches.<String, GenericJson>ofSize(batchSize()) + .withMaxBufferingDuration(maxBufferingDuration()) + .withShardedKey()) + .apply( + "Import CatalogItems", + ParDo.of(new ImportUserEvents(projectId(), catalogName(), eventStore())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { + /** @param projectId ID of Google Cloud project to be used for creating user events. */ + public abstract Builder setProjectId(@Nullable String projectId); + + /** @param catalogName Name of the catalog where the user events will be created. */ + public abstract Builder setCatalogName(@Nullable String catalogName); + + /** @param eventStore Name of the event store where the user events will be created. */ + public abstract Builder setEventStore(@Nullable String eventStore); + + /** + * @param batchSize Amount of input elements to be sent to Recommendation AI service in one + * request. + */ + public abstract Builder setBatchSize(Integer batchSize); + + /** + * @param maxBufferingDuration Time limit (in processing time) on how long an incomplete batch + * of elements is allowed to be buffered. + */ + public abstract Builder setMaxBufferingDuration(Duration maxBufferingDuration); + + public abstract RecommendationAIImportUserEvents build(); + } + + static class ImportUserEvents + extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, UserEvent> { + private final String projectId; + private final String catalogName; + private final String eventStore; + + /** + * @param projectId ID of GCP project to be used for creating user events. + * @param catalogName Catalog name for UserEvent creation. + * @param eventStore Event store name for UserEvent creation. + */ + public ImportUserEvents(String projectId, String catalogName, String eventStore) { Review comment: you can remove `public` from this constructor so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and converts them to and creates + * {@link UserEvent}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIWriteUserEvent + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIWriteUserEvent.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + /** @return ID of Google Cloud project to be used for creating user events. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the user events will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** + * The transform converts the contents of input PCollection into {@link UserEvent}s and then calls + * the Recommendation AI service to create the user event. + * + * @param input input PCollection + * @return PCollectionTuple with successful and failed {@link UserEvent}s + */ + @Override + public PCollectionTuple expand(PCollection<GenericJson> input) { + return input.apply( + ParDo.of(new WriteUserEvent(projectId(), catalogName(), eventStore())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { + /** @param projectId ID of Google Cloud project to be used for creating user events. */ + public abstract Builder setProjectId(@Nullable String projectId); + + /** @param catalogName Name of the catalog where the user events will be created. */ + public abstract Builder setCatalogName(@Nullable String catalogName); + + /** @param eventStore Name of the event store where the user events will be created. */ + public abstract Builder setEventStore(@Nullable String eventStore); + + public abstract RecommendationAIWriteUserEvent build(); + } + + public abstract Builder toBuilder(); Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource; +import com.google.cloud.recommendationengine.v1beta1.CatalogItem; +import com.google.cloud.recommendationengine.v1beta1.CatalogName; +import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link CatalogItem}s. * + * + * <p>Batch size defines how many items are created at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the catalog item id as key and contents as value) and outputs a PCollectionTuple which will + * contain the successfully created and failed catalog items. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportCatalogItems + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<CatalogItem> SUCCESS_TAG = new TupleTag<CatalogItem>() {}; + public static final TupleTag<CatalogItem> FAILURE_TAG = new TupleTag<CatalogItem>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportCatalogItems.Builder(); + } + + public abstract Builder toBuilder(); + + /** @return ID of Google Cloud project to be used for creating catalog items. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the catalog items will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Size of input elements batch to be sent in one request. */ + public abstract Integer batchSize(); + + /** + * @return Time limit (in processing time) on how long an incomplete batch of elements is allowed + * to be buffered. + */ + public abstract Duration maxBufferingDuration(); + + public RecommendationAIImportCatalogItems withProjectId(String projectId) { + return this.toBuilder().setProjectId(projectId).build(); + } + + public RecommendationAIImportCatalogItems withCatalogName(String catalogName) { + return this.toBuilder().setCatalogName(catalogName).build(); + } + + public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) { + return this.toBuilder().setBatchSize(batchSize).build(); + } + + /** + * The transform converts the contents of input PCollection into {@link CatalogItem}s and then + * calls the Recommendation AI service to create the catalog item. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) { + return input + .apply( + "Batch Contents", + GroupIntoBatches.<String, GenericJson>ofSize(batchSize()) + .withMaxBufferingDuration(maxBufferingDuration()) + .withShardedKey()) + .apply( + "Import CatalogItems", + ParDo.of(new ImportCatalogItems(projectId(), catalogName())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { + /** @param projectId ID of Google Cloud project to be used for creating catalog items. */ + public abstract Builder setProjectId(@Nullable String projectId); + + /** @param catalogName Name of the catalog where the catalog items will be created. */ + public abstract Builder setCatalogName(@Nullable String catalogName); + + /** + * @param batchSize Amount of input elements to be sent to Recommendation AI service in one + * request. + */ + public abstract Builder setBatchSize(Integer batchSize); + + /** + * @param maxBufferingDuration Time limit (in processing time) on how long an incomplete batch + * of elements is allowed to be buffered. + */ + public abstract Builder setMaxBufferingDuration(Duration maxBufferingDuration); + + public abstract RecommendationAIImportCatalogItems build(); + } + + static class ImportCatalogItems + extends DoFn<KV<ShardedKey<String>, Iterable<GenericJson>>, CatalogItem> { + private final String projectId; + private final String catalogName; + + /** + * @param projectId ID of GCP project to be used for creating catalog items. + * @param catalogName Catalog name for CatalogItem creation. + */ + public ImportCatalogItems(String projectId, String catalogName) { Review comment: you can remove `public` from this constructor so it can only be used within the package, and users wont accss it ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java ########## @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource; +import com.google.cloud.recommendationengine.v1beta1.CatalogItem; +import com.google.cloud.recommendationengine.v1beta1.CatalogName; +import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest; +import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse; +import com.google.cloud.recommendationengine.v1beta1.InputConfig; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.ArrayList; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.GroupIntoBatches; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.util.ShardedKey; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; +import org.json.JSONObject; + +/** + * A {@link PTransform} connecting to the Recommendations AI API + * (https://cloud.google.com/recommendations) and creating {@link CatalogItem}s. * + * + * <p>Batch size defines how many items are created at once per batch (max: 5000). + * + * <p>The transform consumes {@link KV} of {@link String} and {@link GenericJson}s (assumed to be + * the catalog item id as key and contents as value) and outputs a PCollectionTuple which will + * contain the successfully created and failed catalog items. + * + * <p>It is possible to provide a catalog name to which you want to add the catalog item (defaults + * to "default_catalog"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIImportCatalogItems + extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> { + + public static final TupleTag<CatalogItem> SUCCESS_TAG = new TupleTag<CatalogItem>() {}; + public static final TupleTag<CatalogItem> FAILURE_TAG = new TupleTag<CatalogItem>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIImportCatalogItems.Builder(); + } + + public abstract Builder toBuilder(); + + /** @return ID of Google Cloud project to be used for creating catalog items. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the catalog items will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Size of input elements batch to be sent in one request. */ + public abstract Integer batchSize(); + + /** + * @return Time limit (in processing time) on how long an incomplete batch of elements is allowed + * to be buffered. + */ + public abstract Duration maxBufferingDuration(); + + public RecommendationAIImportCatalogItems withProjectId(String projectId) { + return this.toBuilder().setProjectId(projectId).build(); + } + + public RecommendationAIImportCatalogItems withCatalogName(String catalogName) { + return this.toBuilder().setCatalogName(catalogName).build(); + } + + public RecommendationAIImportCatalogItems withBatchSize(Integer batchSize) { + return this.toBuilder().setBatchSize(batchSize).build(); + } + + /** + * The transform converts the contents of input PCollection into {@link CatalogItem}s and then + * calls the Recommendation AI service to create the catalog item. + * + * @param input input PCollection + * @return PCollection after transformations + */ + @Override + public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) { + return input + .apply( + "Batch Contents", + GroupIntoBatches.<String, GenericJson>ofSize(batchSize()) + .withMaxBufferingDuration(maxBufferingDuration()) + .withShardedKey()) + .apply( + "Import CatalogItems", + ParDo.of(new ImportCatalogItems(projectId(), catalogName())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.PlacementName; +import com.google.cloud.recommendationengine.v1beta1.PredictResponse; +import com.google.cloud.recommendationengine.v1beta1.PredictionServiceClient; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and creates {@link + * PredictResponse.PredictionResult}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). A placement id for the recommendation engine placement + * to be used. + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIPredict + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + /** @return ID of Google Cloud project to be used for creating catalog items. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the catalog items will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** @return ID of the recommendation engine placement. */ + public abstract String placementId(); + + public static final TupleTag<PredictResponse.PredictionResult> SUCCESS_TAG = + new TupleTag<PredictResponse.PredictionResult>() {}; + + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + @AutoValue.Builder + public abstract static class Builder { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and converts them to and creates + * {@link UserEvent}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIWriteUserEvent + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIWriteUserEvent.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + /** @return ID of Google Cloud project to be used for creating user events. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the user events will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** + * The transform converts the contents of input PCollection into {@link UserEvent}s and then calls + * the Recommendation AI service to create the user event. + * + * @param input input PCollection + * @return PCollectionTuple with successful and failed {@link UserEvent}s + */ + @Override + public PCollectionTuple expand(PCollection<GenericJson> input) { + return input.apply( + ParDo.of(new WriteUserEvent(projectId(), catalogName(), eventStore())) + .withOutputTags(SUCCESS_TAG, TupleTagList.of(FAILURE_TAG))); + } + + @AutoValue.Builder + public abstract static class Builder { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIWriteUserEvent.java ########## @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.EventStoreName; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import java.util.concurrent.ExecutionException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and converts them to and creates + * {@link UserEvent}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIWriteUserEvent + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + public static final TupleTag<UserEvent> SUCCESS_TAG = new TupleTag<UserEvent>() {}; + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + public static Builder newBuilder() { Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/build.gradle ########## @@ -32,13 +32,19 @@ dependencies { compile project(path: ":sdks:java:core", configuration: "shadow") compile project(":sdks:java:expansion-service") permitUnusedDeclared project(":sdks:java:expansion-service") // BEAM-11761 + compile group: 'com.google.http-client', name: 'google-http-client', version: '1.38.0' + compile 'com.google.cloud:google-cloud-recommendations-ai:0.3.7' compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' compile 'com.google.cloud:google-cloud-dlp:1.1.4' compile 'com.google.cloud:google-cloud-language:1.99.4' + compile group: 'com.google.protobuf', name: 'protobuf-java-util', version: '3.14.0' Review comment: depend on protobuf from here: https://github.com/apache/beam/blob/master/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy#L613 ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.PlacementName; +import com.google.cloud.recommendationengine.v1beta1.PredictResponse; +import com.google.cloud.recommendationengine.v1beta1.PredictionServiceClient; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and creates {@link + * PredictResponse.PredictionResult}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). A placement id for the recommendation engine placement + * to be used. + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIPredict + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + /** @return ID of Google Cloud project to be used for creating catalog items. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the catalog items will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** @return ID of the recommendation engine placement. */ + public abstract String placementId(); + + public static final TupleTag<PredictResponse.PredictionResult> SUCCESS_TAG = + new TupleTag<PredictResponse.PredictionResult>() {}; + + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + @AutoValue.Builder + public abstract static class Builder { + /** @param projectId ID of Google Cloud project to be used for creating catalog items. */ + public abstract Builder setProjectId(@Nullable String projectId); + + /** @param catalogName Name of the catalog where the catalog items will be created. */ + public abstract Builder setCatalogName(@Nullable String catalogName); + + /** @param eventStore Name of the event store where the user events will be created. */ + public abstract Builder setEventStore(@Nullable String eventStore); + + /** @param placementId of the recommendation engine placement. */ + public abstract Builder setPlacementId(String placementId); + + public abstract RecommendationAIPredict build(); + } + + public static Builder newBuilder() { + return new AutoValue_RecommendationAIPredict.Builder() + .setCatalogName("default_catalog") + .setEventStore("default_event_store"); + } + + public abstract Builder toBuilder(); Review comment: you can remove `public` from this builder so it can only be used within the package ########## File path: sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIPredict.java ########## @@ -0,0 +1,155 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.client.json.GenericJson; +import com.google.api.gax.rpc.ApiException; +import com.google.auto.value.AutoValue; +import com.google.cloud.recommendationengine.v1beta1.PlacementName; +import com.google.cloud.recommendationengine.v1beta1.PredictResponse; +import com.google.cloud.recommendationengine.v1beta1.PredictionServiceClient; +import com.google.cloud.recommendationengine.v1beta1.UserEvent; +import com.google.protobuf.util.JsonFormat; +import java.io.IOException; +import javax.annotation.Nullable; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.json.JSONObject; + +/** + * A {@link PTransform} using the Recommendations AI API (https://cloud.google.com/recommendations). + * Takes an input {@link PCollection} of {@link GenericJson}s and creates {@link + * PredictResponse.PredictionResult}s. + * + * <p>It is possible to provide a catalog name to which you want to add the user event (defaults to + * "default_catalog"). It is possible to provide a event store to which you want to add the user + * event (defaults to "default_event_store"). A placement id for the recommendation engine placement + * to be used. + */ +@AutoValue +@SuppressWarnings({"nullness"}) +public abstract class RecommendationAIPredict + extends PTransform<PCollection<GenericJson>, PCollectionTuple> { + + /** @return ID of Google Cloud project to be used for creating catalog items. */ + public abstract @Nullable String projectId(); + + /** @return Name of the catalog where the catalog items will be created. */ + public abstract @Nullable String catalogName(); + + /** @return Name of the event store where the user events will be created. */ + public abstract @Nullable String eventStore(); + + /** @return ID of the recommendation engine placement. */ + public abstract String placementId(); + + public static final TupleTag<PredictResponse.PredictionResult> SUCCESS_TAG = + new TupleTag<PredictResponse.PredictionResult>() {}; + + public static final TupleTag<UserEvent> FAILURE_TAG = new TupleTag<UserEvent>() {}; + + @AutoValue.Builder + public abstract static class Builder { + /** @param projectId ID of Google Cloud project to be used for creating catalog items. */ + public abstract Builder setProjectId(@Nullable String projectId); + + /** @param catalogName Name of the catalog where the catalog items will be created. */ + public abstract Builder setCatalogName(@Nullable String catalogName); + + /** @param eventStore Name of the event store where the user events will be created. */ + public abstract Builder setEventStore(@Nullable String eventStore); + + /** @param placementId of the recommendation engine placement. */ + public abstract Builder setPlacementId(String placementId); + + public abstract RecommendationAIPredict build(); + } + + public static Builder newBuilder() { Review comment: you can remove `public` from this builder so it can only be used within the package -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
