This is an automated email from the ASF dual-hosted git repository. mwalenia pushed a commit to branch BEAM-9147-videointelligence in repository https://gitbox.apache.org/repos/asf/beam.git
commit a7f55e60c1cfe39112331dd4e2b34bc522a2c77a Author: Michal Walenia <[email protected]> AuthorDate: Tue Mar 24 15:41:31 2020 +0100 [BEAM-9147] Add VideoIntelligence transform --- sdks/java/extensions/ml/build.gradle | 34 ++++++ .../beam/sdk/extensions/ml/AnnotateVideo.java | 81 +++++++++++++ .../beam/sdk/extensions/ml/VideoIntelligence.java | 126 +++++++++++++++++++++ .../beam/sdk/extensions/ml/AnnotateVideoTest.java | 73 ++++++++++++ .../org.mockito.plugins.MockMaker | 1 + settings.gradle | 3 + 6 files changed, 318 insertions(+) diff --git a/sdks/java/extensions/ml/build.gradle b/sdks/java/extensions/ml/build.gradle new file mode 100644 index 0000000..482decc --- /dev/null +++ b/sdks/java/extensions/ml/build.gradle @@ -0,0 +1,34 @@ +/* + * + * * Licensed to the Apache Software Foundation (ASF) under one + * * or more contributor license agreements. See the NOTICE file + * * distributed with this work for additional information + * * regarding copyright ownership. The ASF licenses this file + * * to you under the Apache License, Version 2.0 (the + * * "License"); you may not use this file except in compliance + * * with the License. You may obtain a copy of the License at + * * + * * http://www.apache.org/licenses/LICENSE-2.0 + * * + * * Unless required by applicable law or agreed to in writing, software + * * distributed under the License is distributed on an "AS IS" BASIS, + * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * * See the License for the specific language governing permissions and + * * limitations under the License. + * + */ + +plugins { id 'org.apache.beam.module' } +applyJavaNature(automaticModuleName: 'org.apache.beam.sdk.extensions.protobuf') + +description = 'Apache Beam :: SDKs :: Java :: Extensions :: ML' + +dependencies { + compile project(path: ":sdks:java:core", configuration: "shadow") + compile project(":sdks:java:expansion-service") + testCompile project(path: ':sdks:java:core', configuration: 'shadowTest') + compile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + testCompile library.java.mockito_core + testCompile 'com.google.cloud:google-cloud-video-intelligence:1.2.0' + testCompile library.java.junit +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java new file mode 100644 index 0000000..edb5c04 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.videointelligence.v1.*; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.values.PCollectionView; + +public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> { + + protected final PCollectionView<Map<T, VideoContext>> contextSideInput; + protected final List<Feature> featureList; + VideoIntelligenceServiceClient videoIntelligenceServiceClient; + + public AnnotateVideo( + PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) { + this.contextSideInput = contextSideInput; + this.featureList = featureList; + } + + public AnnotateVideo(List<Feature> featureList) { + contextSideInput = null; + this.featureList = featureList; + } + + @StartBundle + public void startBundle() throws IOException { + videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); + } + + @Teardown + public void teardown() { + videoIntelligenceServiceClient.close(); + } + + List<VideoAnnotationResults> getVideoAnnotationResults( + String elementURI, ByteString elementContents, VideoContext videoContext) + throws InterruptedException, ExecutionException { + AnnotateVideoRequest.Builder requestBuilder = + AnnotateVideoRequest.newBuilder().addAllFeatures(featureList); + if (elementURI != null) { + requestBuilder.setInputUri(elementURI); + } else if (elementContents != null) { + requestBuilder.setInputContent(elementContents); + } else { + throw new IllegalArgumentException("Either elementURI or elementContents should be non-null"); + } + if (videoContext != null) { + requestBuilder.setVideoContext(videoContext); + } + AnnotateVideoRequest annotateVideoRequest = requestBuilder.build(); + OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync = + videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest); + return annotateVideoAsync.get().getAnnotationResultsList(); + } + + @ProcessElement + public abstract void processElement(ProcessContext context) + throws ExecutionException, InterruptedException; +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java new file mode 100644 index 0000000..4b01de0 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.*; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; + +public class VideoIntelligence { + + public static AnnotateVideoFromURI annotateFromURI( + List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) { + return new AnnotateVideoFromURI(contextSideInput, featureList); + } + + public static AnnotateVideoFromBytes annotateFromBytes( + PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) { + return new AnnotateVideoFromBytes(contextSideInput, featureList); + } + + public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) { + return new AnnotateVideoURIWithContext(featureList); + } + + public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( + List<Feature> featureList) { + return new AnnotateVideoBytesWithContext(featureList); + } + + public static class AnnotateVideoFromURI extends AnnotateVideo<String> { + + public AnnotateVideoFromURI( + PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) { + super(contextSideInput, featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(elementURI); + } + List<VideoAnnotationResults> annotationResultsList = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(annotationResultsList); + } + } + + public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> { + + public AnnotateVideoFromBytes( + PCollectionView<Map<ByteString, VideoContext>> contextSideInput, + List<Feature> featureList) { + super(contextSideInput, featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(element); + } + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } + } + + public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> { + + public AnnotateVideoURIWithContext(List<Feature> featureList) { + super(featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(videoAnnotationResults); + } + } + + public static class AnnotateVideoBytesWithContext + extends AnnotateVideo<KV<ByteString, VideoContext>> { + + public AnnotateVideoBytesWithContext(List<Feature> featureList) { + super(featureList); + } + + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } + } +} diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java new file mode 100644 index 0000000..57400e4 --- /dev/null +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import com.google.api.gax.longrunning.OperationFuture; +import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; +import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; +import com.google.protobuf.ByteString; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AnnotateVideoTest { + + private static final String TEST_URI = "fake_uri"; + private static final ByteString TEST_BYTES = ByteString.copyFromUtf8("12345"); + + @Mock private VideoIntelligenceServiceClient serviceClient; + @Mock private OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> future; + @Mock private AnnotateVideoResponse response; + + @Test + public void shouldReturnAListOfAnnotations() throws ExecutionException, InterruptedException { + when(response.getAnnotationResultsList()) + .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build())); + when(future.get()).thenReturn(response); + when(serviceClient.annotateVideoAsync(any())).thenReturn(future); + VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = + VideoIntelligence.annotateFromBytes( + null, Collections.singletonList(Feature.LABEL_DETECTION)); + + annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient; + List<VideoAnnotationResults> videoAnnotationResults = + annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null); + assertEquals(1, videoAnnotationResults.size()); + } + + @Test(expected = IllegalArgumentException.class) + public void shouldThrowErrorWhenBothInputTypesNull() + throws ExecutionException, InterruptedException { + VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = + VideoIntelligence.annotateFromBytes( + null, Collections.singletonList(Feature.LABEL_DETECTION)); + annotateVideoFromBytes.getVideoAnnotationResults(null, null, null); + } +} diff --git a/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker new file mode 100644 index 0000000..1f0955d --- /dev/null +++ b/sdks/java/extensions/ml/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker @@ -0,0 +1 @@ +mock-maker-inline diff --git a/settings.gradle b/settings.gradle index a25f357..b7c832a 100644 --- a/settings.gradle +++ b/settings.gradle @@ -165,3 +165,6 @@ include "beam-test-infra-metrics" project(":beam-test-infra-metrics").dir = file(".test-infra/metrics") include "beam-test-tools" project(":beam-test-tools").dir = file(".test-infra/tools") +include 'sdks:java:extensions:ml' +findProject(':sdks:java:extensions:ml')?.name = 'ml' +
