pabloem commented on a change in pull request #13645:
URL: https://github.com/apache/beam/pull/13645#discussion_r578855272



##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/BatchRequestForRecommendationAI.java
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.beam.sdk.metrics.Counter;
+import org.apache.beam.sdk.metrics.Metrics;
+import org.apache.beam.sdk.state.BagState;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
+import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.Timer;
+import org.apache.beam.sdk.state.TimerSpec;
+import org.apache.beam.sdk.state.TimerSpecs;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.values.KV;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * DoFn that batches up GenericJson to reduce the number of requests made to 
the Recommendations
+ * API.
+ */
+class BatchRequestForRecommendationAI
+    extends DoFn<KV<String, GenericJson>, KV<String, Iterable<GenericJson>>> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BatchRequestForRecommendationAI.class);
+
+  private final Counter numberOfRowsBagged =
+      Metrics.counter(BatchRequestForRecommendationAI.class, 
"numberOfRowsBagged");
+
+  private final Integer maxBatchSize;
+
+  @StateId("elementsBag")
+  private final StateSpec<BagState<KV<String, GenericJson>>> elementsBag = 
StateSpecs.bag();
+
+  @TimerId("eventTimer")
+  private final TimerSpec eventTimer = TimerSpecs.timer(TimeDomain.EVENT_TIME);
+
+  /**
+   * Constructs the batching DoFn.
+   *
+   * @param maxBatchSize Desired batch size in bytes.
+   */
+  public BatchRequestForRecommendationAI(Integer maxBatchSize) {
+    this.maxBatchSize = maxBatchSize;
+  }
+
+  @ProcessElement
+  public void process(
+      @Element KV<String, GenericJson> element,
+      @StateId("elementsBag") BagState<KV<String, GenericJson>> elementsBag,
+      @TimerId("eventTimer") Timer eventTimer,
+      BoundedWindow w) {
+    elementsBag.add(element);
+    eventTimer.set(w.maxTimestamp());

Review comment:
       Consider a streaming pipeline that streams elements in the global window 
(a simple pipeline that does not use windowing is fairly commont) - in this 
case, this timer would never be triggered, right?
   
   I recommend that instead of using this transform, you can use 
[GroupIntoBatches](https://beam.apache.org/releases/javadoc/2.27.0/org/apache/beam/sdk/transforms/GroupIntoBatches.html),
 which implements this functionality, and receives other updates (like 
auto-sharded keys)

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportCatalogItems.java
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.CatalogInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.CatalogItem;
+import com.google.cloud.recommendationengine.v1beta1.CatalogName;
+import com.google.cloud.recommendationengine.v1beta1.CatalogServiceClient;
+import com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsRequest;
+import 
com.google.cloud.recommendationengine.v1beta1.ImportCatalogItemsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link 
CatalogItem}s. *
+ *
+ * <p>Batch size defines how many items are created at once per batch (max: 
5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link 
GenericJson}s (assumed to be
+ * the catalog item id as key and contents as value) and outputs a 
PCollectionTuple which will
+ * contain the successfully created and failed catalog items.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
catalog item (defaults
+ * to "default_catalog").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportCatalogItems
+    extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> 
{
+
+  public static final TupleTag<CatalogItem> SUCCESS_TAG = new 
TupleTag<CatalogItem>() {};
+  public static final TupleTag<CatalogItem> FAILURE_TAG = new 
TupleTag<CatalogItem>() {};
+
+  public static Builder newBuilder() {
+    return new AutoValue_RecommendationAIImportCatalogItems.Builder();
+  }
+
+  /** @return ID of Google Cloud project to be used for creating catalog 
items. */
+  public abstract String projectId();
+
+  /** @return Name of the catalog where the catalog items will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Size of input elements batch to be sent to Cloud DLP service in 
one request. */
+  public abstract Integer batchSize();
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
CatalogItem}s and then
+   * calls the Recommendation AI service to create the catalog item.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new 
BatchRequestForRecommendationAI(batchSize())))

Review comment:
       You can change this to work this way:
   
   ```suggestion
           .apply("Batch Contents", GroupIntoBatches.ofSize(batchSize())
               .withShardedKey()
               .withMaxBufferingDuration(SOMETHING))
   ```
   
   where `SOMETHING` can be a default parameter, or a user-determined parameter 
(like batchSize). thoughts?

##########
File path: 
sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/RecommendationAIImportUserEvents.java
##########
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.client.json.GenericJson;
+import com.google.api.gax.rpc.ApiException;
+import com.google.auto.value.AutoValue;
+import com.google.cloud.recommendationengine.v1beta1.EventStoreName;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsRequest;
+import com.google.cloud.recommendationengine.v1beta1.ImportUserEventsResponse;
+import com.google.cloud.recommendationengine.v1beta1.InputConfig;
+import com.google.cloud.recommendationengine.v1beta1.UserEvent;
+import com.google.cloud.recommendationengine.v1beta1.UserEventInlineSource;
+import com.google.cloud.recommendationengine.v1beta1.UserEventServiceClient;
+import com.google.protobuf.util.JsonFormat;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollectionTuple;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TupleTagList;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.json.JSONObject;
+
+/**
+ * A {@link PTransform} connecting to the Recommendations AI API
+ * (https://cloud.google.com/recommendations) and creating {@link UserEvent}s. 
*
+ *
+ * <p>Batch size defines how many items are at once per batch (max: 5000).
+ *
+ * <p>The transform consumes {@link KV} of {@link String} and {@link 
GenericJson}s (assumed to be
+ * the user event id as key and contents as value) and outputs a 
PCollectionTuple which will contain
+ * the successfully created and failed user events.
+ *
+ * <p>It is possible to provide a catalog name to which you want to add the 
catalog item (defaults
+ * to "default_catalog"). It is possible to provide a event store to which you 
want to add the user
+ * event (defaults to "default_event_store").
+ */
+@AutoValue
+@SuppressWarnings({"nullness"})
+public abstract class RecommendationAIImportUserEvents
+    extends PTransform<PCollection<KV<String, GenericJson>>, PCollectionTuple> 
{
+
+  public static final TupleTag<UserEvent> SUCCESS_TAG = new 
TupleTag<UserEvent>() {};
+  public static final TupleTag<UserEvent> FAILURE_TAG = new 
TupleTag<UserEvent>() {};
+
+  public static Builder newBuilder() {
+    return new AutoValue_RecommendationAIImportUserEvents.Builder()
+        .setCatalogName("default_catalog")
+        .setEventStore("default_event_store");
+  }
+
+  /** @return ID of Google Cloud project to be used for creating user events. 
*/
+  public abstract String projectId();
+
+  /** @return Name of the catalog where the user events will be created. */
+  public abstract @Nullable String catalogName();
+
+  /** @return Name of the event store where the user events will be created. */
+  public abstract @Nullable String eventStore();
+
+  /** @return Size of input elements batch to be sent to Cloud DLP service in 
one request. */
+  public abstract Integer batchSize();
+
+  /**
+   * The transform converts the contents of input PCollection into {@link 
UserEvent}s and then calls
+   * the Recommendation AI service to create the user event.
+   *
+   * @param input input PCollection
+   * @return PCollection after transformations
+   */
+  @Override
+  public PCollectionTuple expand(PCollection<KV<String, GenericJson>> input) {
+    return input
+        .apply("Batch Contents", ParDo.of(new 
BatchRequestForRecommendationAI(batchSize())))

Review comment:
       Like before, try:
   
   ```suggestion
           .apply("Batch Contents", GroupIntoBatches.ofSize(batchSize())
               .withShardedKey()
               .withMaxBufferingDuration(SOMETHING))
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to