This is an automated email from the ASF dual-hosted git repository.

mwalenia pushed a commit to branch BEAM-9147-videointelligence
in repository https://gitbox.apache.org/repos/asf/beam.git

commit a7f55e60c1cfe39112331dd4e2b34bc522a2c77a
Author: Michal Walenia <[email protected]>
AuthorDate: Tue Mar 24 15:41:31 2020 +0100

    [BEAM-9147] Add VideoIntelligence transform
---
 sdks/java/extensions/ml/build.gradle               |  34 ++++++
 .../beam/sdk/extensions/ml/AnnotateVideo.java      |  81 +++++++++++++
 .../beam/sdk/extensions/ml/VideoIntelligence.java  | 126 +++++++++++++++++++++
 .../beam/sdk/extensions/ml/AnnotateVideoTest.java  |  73 ++++++++++++
 .../org.mockito.plugins.MockMaker                  |   1 +
 settings.gradle                                    |   3 +
 6 files changed, 318 insertions(+)

diff --git a/sdks/java/extensions/ml/build.gradle 
b/sdks/java/extensions/ml/build.gradle
new file mode 100644
index 0000000..482decc
--- /dev/null
+++ b/sdks/java/extensions/ml/build.gradle
@@ -0,0 +1,34 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+plugins { id 'org.apache.beam.module' }
+applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf')
+
+description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML'
+
+dependencies {
+    compile project(path: ":sdks:java:core", configuration: "shadow")
+    compile project(":sdks:java:expansion-service")
+    testCompile project(path: ':sdks:java:core', configuration: 'shadowTest')
+    compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile library.java.mockito_core
+    testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0'
+    testCompile library.java.junit
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
new file mode 100644
index 0000000..edb5c04
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.videointelligence.v1.*;
+import com.google.protobuf.ByteString;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.PCollectionView;
+
+public abstract class AnnotateVideo<T> extends DoFn<T, 
List<VideoAnnotationResults>> {
+
+  protected final PCollectionView<Map<T, VideoContext>> contextSideInput;
+  protected final List<Feature> featureList;
+  VideoIntelligenceServiceClient videoIntelligenceServiceClient;
+
+  public AnnotateVideo(
+      PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> 
featureList) {
+    this.contextSideInput = contextSideInput;
+    this.featureList = featureList;
+  }
+
+  public AnnotateVideo(List<Feature> featureList) {
+    contextSideInput = null;
+    this.featureList = featureList;
+  }
+
+  @StartBundle
+  public void startBundle() throws IOException {
+    videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create();
+  }
+
+  @Teardown
+  public void teardown() {
+    videoIntelligenceServiceClient.close();
+  }
+
+  List<VideoAnnotationResults> getVideoAnnotationResults(
+      String elementURI, ByteString elementContents, VideoContext videoContext)
+      throws InterruptedException, ExecutionException {
+    AnnotateVideoRequest.Builder requestBuilder =
+        AnnotateVideoRequest.newBuilder().addAllFeatures(featureList);
+    if (elementURI != null) {
+      requestBuilder.setInputUri(elementURI);
+    } else if (elementContents != null) {
+      requestBuilder.setInputContent(elementContents);
+    } else {
+      throw new IllegalArgumentException("Either elementURI or elementContents 
should be non-null");
+    }
+    if (videoContext != null) {
+      requestBuilder.setVideoContext(videoContext);
+    }
+    AnnotateVideoRequest annotateVideoRequest = requestBuilder.build();
+    OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> 
annotateVideoAsync =
+        
videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest);
+    return annotateVideoAsync.get().getAnnotationResultsList();
+  }
+
+  @ProcessElement
+  public abstract void processElement(ProcessContext context)
+      throws ExecutionException, InterruptedException;
+}
diff --git 
a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
new file mode 100644
index 0000000..4b01de0
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import com.google.cloud.videointelligence.v1.*;
+import com.google.protobuf.ByteString;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollectionView;
+
+public class VideoIntelligence {
+
+  public static AnnotateVideoFromURI annotateFromURI(
+      List<Feature> featureList, PCollectionView<Map<String, VideoContext>> 
contextSideInput) {
+    return new AnnotateVideoFromURI(contextSideInput, featureList);
+  }
+
+  public static AnnotateVideoFromBytes annotateFromBytes(
+      PCollectionView<Map<ByteString, VideoContext>> contextSideInput, 
List<Feature> featureList) {
+    return new AnnotateVideoFromBytes(contextSideInput, featureList);
+  }
+
+  public static AnnotateVideoURIWithContext 
annotateFromUriWithContext(List<Feature> featureList) {
+    return new AnnotateVideoURIWithContext(featureList);
+  }
+
+  public static AnnotateVideoBytesWithContext annotateFromBytesWithContext(
+      List<Feature> featureList) {
+    return new AnnotateVideoBytesWithContext(featureList);
+  }
+
+  public static class AnnotateVideoFromURI extends AnnotateVideo<String> {
+
+    public AnnotateVideoFromURI(
+        PCollectionView<Map<String, VideoContext>> contextSideInput, 
List<Feature> featureList) {
+      super(contextSideInput, featureList);
+    }
+
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      String elementURI = context.element();
+      VideoContext videoContext = null;
+      if (contextSideInput != null) {
+        videoContext = context.sideInput(contextSideInput).get(elementURI);
+      }
+      List<VideoAnnotationResults> annotationResultsList =
+          getVideoAnnotationResults(elementURI, null, videoContext);
+      context.output(annotationResultsList);
+    }
+  }
+
+  public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> 
{
+
+    public AnnotateVideoFromBytes(
+        PCollectionView<Map<ByteString, VideoContext>> contextSideInput,
+        List<Feature> featureList) {
+      super(contextSideInput, featureList);
+    }
+
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      ByteString element = context.element();
+      VideoContext videoContext = null;
+      if (contextSideInput != null) {
+        videoContext = context.sideInput(contextSideInput).get(element);
+      }
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(null, element, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+
+  public static class AnnotateVideoURIWithContext extends 
AnnotateVideo<KV<String, VideoContext>> {
+
+    public AnnotateVideoURIWithContext(List<Feature> featureList) {
+      super(featureList);
+    }
+
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      String elementURI = context.element().getKey();
+      VideoContext videoContext = context.element().getValue();
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(elementURI, null, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+
+  public static class AnnotateVideoBytesWithContext
+      extends AnnotateVideo<KV<ByteString, VideoContext>> {
+
+    public AnnotateVideoBytesWithContext(List<Feature> featureList) {
+      super(featureList);
+    }
+
+    @Override
+    public void processElement(ProcessContext context)
+        throws ExecutionException, InterruptedException {
+      ByteString element = context.element().getKey();
+      VideoContext videoContext = context.element().getValue();
+      List<VideoAnnotationResults> videoAnnotationResults =
+          getVideoAnnotationResults(null, element, videoContext);
+      context.output(videoAnnotationResults);
+    }
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
new file mode 100644
index 0000000..57400e4
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.ml;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.when;
+
+import com.google.api.gax.longrunning.OperationFuture;
+import com.google.cloud.videointelligence.v1.AnnotateVideoProgress;
+import com.google.cloud.videointelligence.v1.AnnotateVideoResponse;
+import com.google.cloud.videointelligence.v1.Feature;
+import com.google.cloud.videointelligence.v1.VideoAnnotationResults;
+import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient;
+import com.google.protobuf.ByteString;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AnnotateVideoTest {
+
+  private static final String TEST_URI = "fake_uri";
+  private static final ByteString TEST_BYTES = 
ByteString.copyFromUtf8("12345");
+
+  @Mock private VideoIntelligenceServiceClient serviceClient;
+  @Mock private OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> 
future;
+  @Mock private AnnotateVideoResponse response;
+
+  @Test
+  public void shouldReturnAListOfAnnotations() throws ExecutionException, 
InterruptedException {
+    when(response.getAnnotationResultsList())
+        
.thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build()));
+    when(future.get()).thenReturn(response);
+    when(serviceClient.annotateVideoAsync(any())).thenReturn(future);
+    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
+        VideoIntelligence.annotateFromBytes(
+            null, Collections.singletonList(Feature.LABEL_DETECTION));
+
+    annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient;
+    List<VideoAnnotationResults> videoAnnotationResults =
+        annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null);
+    assertEquals(1, videoAnnotationResults.size());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void shouldThrowErrorWhenBothInputTypesNull()
+      throws ExecutionException, InterruptedException {
+    VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes =
+        VideoIntelligence.annotateFromBytes(
+            null, Collections.singletonList(Feature.LABEL_DETECTION));
+    annotateVideoFromBytes.getVideoAnnotationResults(null, null, null);
+  }
+}
diff --git 
a/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
 
b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..1f0955d
--- /dev/null
+++ 
b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1 @@
+mock-maker-inline
diff --git a/settings.gradle b/settings.gradle
index a25f357..b7c832a 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -165,3 +165,6 @@ include "beam-test-infra-metrics"
 project(":beam-test-infra-metrics").dir = file(".test-infra/metrics")
 include "beam-test-tools"
 project(":beam-test-tools").dir = file(".test-infra/tools")
+include 'sdks:java:extensions:ml'
+findProject(':sdks:java:extensions:ml')?.name = 'ml'
+

Reply via email to