This is an automated email from the ASF dual-hosted git repository. mwalenia pushed a commit to branch BEAM-9147-use-ptransforms in repository https://gitbox.apache.org/repos/asf/beam.git
commit 31117953cb7ca7728099be0d1a1c7410dbc14062 Author: Michal Walenia <[email protected]> AuthorDate: Mon Apr 20 13:14:48 2020 +0200 [BEAM-9147] Make VideoIntelligence use PTransform on user-facing API --- .../beam/sdk/extensions/ml/AnnotateVideo.java | 73 +++----------- .../ml/AnnotateVideoBytesWithContextFn.java | 50 ++++++++++ .../{AnnotateVideo.java => AnnotateVideoFn.java} | 10 +- .../extensions/ml/AnnotateVideoFromBytesFn.java | 55 +++++++++++ .../sdk/extensions/ml/AnnotateVideoFromURIFn.java | 54 ++++++++++ .../ml/AnnotateVideoURIWithContextFn.java | 49 +++++++++ .../beam/sdk/extensions/ml/VideoIntelligence.java | 109 +++++++++------------ .../beam/sdk/extensions/ml/package-info.java | 3 + .../beam/sdk/extensions/ml/AnnotateVideoTest.java | 16 ++- .../sdk/extensions/ml/VideoIntelligenceIT.java | 4 +- 10 files changed, 285 insertions(+), 138 deletions(-) diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java index 56e8638..3cedb62 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java @@ -17,87 +17,44 @@ */ package org.apache.beam.sdk.extensions.ml; -import com.google.api.gax.longrunning.OperationFuture; -import com.google.cloud.videointelligence.v1.AnnotateVideoProgress; -import com.google.cloud.videointelligence.v1.AnnotateVideoRequest; -import com.google.cloud.videointelligence.v1.AnnotateVideoResponse; import com.google.cloud.videointelligence.v1.Feature; import com.google.cloud.videointelligence.v1.VideoAnnotationResults; import com.google.cloud.videointelligence.v1.VideoContext; -import com.google.cloud.videointelligence.v1.VideoIntelligenceServiceClient; -import com.google.protobuf.ByteString; -import java.io.IOException; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; -import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** - * Base class for Video Intelligence transform. + * Base class for VideoIntelligence PTransform. * - * @param <T> Class of input data being passed in - either ByteString - video data encoded into. - * String or String - a GCS URI of the video to be annotated. + * @param <T> Type of input PCollection contents. */ -public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> { - +@Experimental +public abstract class AnnotateVideo<T> + extends PTransform<PCollection<T>, PCollection<List<VideoAnnotationResults>>> { protected final PCollectionView<Map<T, VideoContext>> contextSideInput; protected final List<Feature> featureList; - VideoIntelligenceServiceClient videoIntelligenceServiceClient; - public AnnotateVideo( + protected AnnotateVideo( PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) { this.contextSideInput = contextSideInput; this.featureList = featureList; } - public AnnotateVideo(List<Feature> featureList) { - contextSideInput = null; + protected AnnotateVideo(List<Feature> featureList) { + this.contextSideInput = null; this.featureList = featureList; } - @StartBundle - public void startBundle() throws IOException { - videoIntelligenceServiceClient = VideoIntelligenceServiceClient.create(); - } - - @Teardown - public void teardown() { - videoIntelligenceServiceClient.close(); - } - /** - * Call the Video Intelligence Cloud AI service and return annotation results. + * To be implemented based on input PCollection's content type. * - * @param elementURI This or elementContents is required. GCS address of video to be annotated - * @param elementContents this or elementURI is required. Hex-encoded contents of video to be - * annotated - * @param videoContext Optional context for video annotation. + * @param input * @return */ - List<VideoAnnotationResults> getVideoAnnotationResults( - String elementURI, ByteString elementContents, VideoContext videoContext) - throws InterruptedException, ExecutionException { - AnnotateVideoRequest.Builder requestBuilder = - AnnotateVideoRequest.newBuilder().addAllFeatures(featureList); - if (elementURI != null) { - requestBuilder.setInputUri(elementURI); - } else if (elementContents != null) { - requestBuilder.setInputContent(elementContents); - } else { - throw new IllegalArgumentException("Either elementURI or elementContents should be non-null"); - } - if (videoContext != null) { - requestBuilder.setVideoContext(videoContext); - } - AnnotateVideoRequest annotateVideoRequest = requestBuilder.build(); - OperationFuture<AnnotateVideoResponse, AnnotateVideoProgress> annotateVideoAsync = - videoIntelligenceServiceClient.annotateVideoAsync(annotateVideoRequest); - return annotateVideoAsync.get().getAnnotationResultsList(); - } - - /** Process element implementation required. */ - @ProcessElement - public abstract void processElement(ProcessContext context) - throws ExecutionException, InterruptedException; + @Override + public abstract PCollection<List<VideoAnnotationResults>> expand(PCollection<T> input); } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java new file mode 100644 index 0000000..c8edcd1 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoBytesWithContextFn.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * ByteString encoded video contents, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoBytesWithContextFn extends AnnotateVideoFn<KV<ByteString, VideoContext>> { + + public AnnotateVideoBytesWithContextFn(List<Feature> featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java similarity index 93% copy from sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java copy to sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java index 56e8638..3cf0c30 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideo.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFn.java @@ -30,28 +30,30 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.PCollectionView; /** - * Base class for Video Intelligence transform. + * Base class for DoFns used in VideoIntelligence transforms. * * @param <T> Class of input data being passed in - either ByteString - video data encoded into. * String or String - a GCS URI of the video to be annotated. */ -public abstract class AnnotateVideo<T> extends DoFn<T, List<VideoAnnotationResults>> { +@Experimental +abstract class AnnotateVideoFn<T> extends DoFn<T, List<VideoAnnotationResults>> { protected final PCollectionView<Map<T, VideoContext>> contextSideInput; protected final List<Feature> featureList; VideoIntelligenceServiceClient videoIntelligenceServiceClient; - public AnnotateVideo( + public AnnotateVideoFn( PCollectionView<Map<T, VideoContext>> contextSideInput, List<Feature> featureList) { this.contextSideInput = contextSideInput; this.featureList = featureList; } - public AnnotateVideo(List<Feature> featureList) { + public AnnotateVideoFn(List<Feature> featureList) { contextSideInput = null; this.featureList = featureList; } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java new file mode 100644 index 0000000..20aa083 --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromBytesFn.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import com.google.protobuf.ByteString; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting ByteStrings as contents of input PCollection. Videos + * decoded from the ByteStrings are annotated. + */ +@Experimental +class AnnotateVideoFromBytesFn extends AnnotateVideoFn<ByteString> { + + public AnnotateVideoFromBytesFn( + PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) { + super(contextSideInput, featureList); + } + + /** Implementation of ProcessElement. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + ByteString element = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(element); + } + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(null, element, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java new file mode 100644 index 0000000..5dfea0c --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoFromURIFn.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * Implementation of AnnotateVideoFn accepting Strings as contents of input PCollection. Annotates + * videos found on GCS based on URIs from input PCollection. + */ +@Experimental +class AnnotateVideoFromURIFn extends AnnotateVideoFn<String> { + + public AnnotateVideoFromURIFn( + PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) { + super(contextSideInput, featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element(); + VideoContext videoContext = null; + if (contextSideInput != null) { + videoContext = context.sideInput(contextSideInput).get(elementURI); + } + List<VideoAnnotationResults> annotationResultsList = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(annotationResultsList); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java new file mode 100644 index 0000000..a165d5a --- /dev/null +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoURIWithContextFn.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.ml; + +import com.google.cloud.videointelligence.v1.Feature; +import com.google.cloud.videointelligence.v1.VideoAnnotationResults; +import com.google.cloud.videointelligence.v1.VideoContext; +import java.util.List; +import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.values.KV; + +/** + * Implementation of AnnotateVideoFn accepting KVs as contents of input PCollection. Keys are the + * GCS URIs, values - VideoContext objects. + */ +@Experimental +class AnnotateVideoURIWithContextFn extends AnnotateVideoFn<KV<String, VideoContext>> { + + public AnnotateVideoURIWithContextFn(List<Feature> featureList) { + super(featureList); + } + + /** ProcessElement implementation. */ + @Override + public void processElement(ProcessContext context) + throws ExecutionException, InterruptedException { + String elementURI = context.element().getKey(); + VideoContext videoContext = context.element().getValue(); + List<VideoAnnotationResults> videoAnnotationResults = + getVideoAnnotationResults(elementURI, null, videoContext); + context.output(videoAnnotationResults); + } +} diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java index 0f447da..5fa25d6 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/VideoIntelligence.java @@ -23,8 +23,10 @@ import com.google.cloud.videointelligence.v1.VideoContext; import com.google.protobuf.ByteString; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; +import org.apache.beam.sdk.annotations.Experimental; +import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; /** @@ -37,6 +39,7 @@ import org.apache.beam.sdk.values.PCollectionView; * * <p>Service account with proper permissions is required to use these transforms. */ +@Experimental public class VideoIntelligence { /** @@ -44,11 +47,11 @@ public class VideoIntelligence { * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to URIs - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoFromURI annotateFromURI( + public static AnnotateVideoFromUri annotateFromURI( List<Feature> featureList, PCollectionView<Map<String, VideoContext>> contextSideInput) { - return new AnnotateVideoFromURI(contextSideInput, featureList); + return new AnnotateVideoFromUri(contextSideInput, featureList); } /** @@ -56,7 +59,7 @@ public class VideoIntelligence { * * @param featureList List of features to be annotated * @param contextSideInput Optional side input with map of contexts to ByteStrings - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ public static AnnotateVideoFromBytes annotateFromBytes( PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) { @@ -67,118 +70,96 @@ public class VideoIntelligence { * Annotates videos from key-value pairs of GCS URI and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoURIWithContext annotateFromUriWithContext(List<Feature> featureList) { - return new AnnotateVideoURIWithContext(featureList); + public static AnnotateVideoFromURIWithContext annotateFromUriWithContext( + List<Feature> featureList) { + return new AnnotateVideoFromURIWithContext(featureList); } /** * Annotates videos from key-value pairs of ByteStrings and VideoContext. * * @param featureList List of features to be annotated - * @return DoFn performing the necessary operations + * @return PTransform performing the necessary operations */ - public static AnnotateVideoBytesWithContext annotateFromBytesWithContext( + public static AnnotateVideoFromBytesWithContext annotateFromBytesWithContext( List<Feature> featureList) { - return new AnnotateVideoBytesWithContext(featureList); + return new AnnotateVideoFromBytesWithContext(featureList); } /** - * Implementation of AnnotateVideo accepting Strings as contents of input PCollection. Annotates - * videos found on GCS based on URIs from input PCollection. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link String} and an optional + * side input with a context map. */ - public static class AnnotateVideoFromURI extends AnnotateVideo<String> { + @Experimental + public static class AnnotateVideoFromUri extends AnnotateVideo<String> { - public AnnotateVideoFromURI( + protected AnnotateVideoFromUri( PCollectionView<Map<String, VideoContext>> contextSideInput, List<Feature> featureList) { super(contextSideInput, featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(elementURI); - } - List<VideoAnnotationResults> annotationResultsList = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(annotationResultsList); + public PCollection<List<VideoAnnotationResults>> expand(PCollection<String> input) { + return input.apply(ParDo.of(new AnnotateVideoFromURIFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting ByteStrings as contents of input PCollection. Videos - * decoded from the ByteStrings are annotated. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link ByteString} and an + * optional side input with a context map. */ + @Experimental public static class AnnotateVideoFromBytes extends AnnotateVideo<ByteString> { - public AnnotateVideoFromBytes( + protected AnnotateVideoFromBytes( PCollectionView<Map<ByteString, VideoContext>> contextSideInput, List<Feature> featureList) { super(contextSideInput, featureList); } - /** Implementation of ProcessElement. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element(); - VideoContext videoContext = null; - if (contextSideInput != null) { - videoContext = context.sideInput(contextSideInput).get(element); - } - List<VideoAnnotationResults> videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection<List<VideoAnnotationResults>> expand(PCollection<ByteString> input) { + return input.apply(ParDo.of(new AnnotateVideoFromBytesFn(contextSideInput, featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * GCS URIs, values - VideoContext objects. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link String} + * and {@link VideoContext}. */ - public static class AnnotateVideoURIWithContext extends AnnotateVideo<KV<String, VideoContext>> { + @Experimental + public static class AnnotateVideoFromURIWithContext + extends AnnotateVideo<KV<String, VideoContext>> { - public AnnotateVideoURIWithContext(List<Feature> featureList) { + protected AnnotateVideoFromURIWithContext(List<Feature> featureList) { super(featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - String elementURI = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List<VideoAnnotationResults> videoAnnotationResults = - getVideoAnnotationResults(elementURI, null, videoContext); - context.output(videoAnnotationResults); + public PCollection<List<VideoAnnotationResults>> expand( + PCollection<KV<String, VideoContext>> input) { + return input.apply(ParDo.of(new AnnotateVideoURIWithContextFn(featureList))); } } /** - * Implementation of AnnotateVideo accepting KVs as contents of input PCollection. Keys are the - * ByteString encoded video contents, values - VideoContext objects. + * Implementation of {@link AnnotateVideo} taking a PCollection of {@link KV} of {@link + * ByteString} and {@link VideoContext}. */ - public static class AnnotateVideoBytesWithContext + @Experimental + public static class AnnotateVideoFromBytesWithContext extends AnnotateVideo<KV<ByteString, VideoContext>> { - public AnnotateVideoBytesWithContext(List<Feature> featureList) { + protected AnnotateVideoFromBytesWithContext(List<Feature> featureList) { super(featureList); } - /** ProcessElement implementation. */ @Override - public void processElement(ProcessContext context) - throws ExecutionException, InterruptedException { - ByteString element = context.element().getKey(); - VideoContext videoContext = context.element().getValue(); - List<VideoAnnotationResults> videoAnnotationResults = - getVideoAnnotationResults(null, element, videoContext); - context.output(videoAnnotationResults); + public PCollection<List<VideoAnnotationResults>> expand( + PCollection<KV<ByteString, VideoContext>> input) { + return input.apply(ParDo.of(new AnnotateVideoBytesWithContextFn(featureList))); } } } diff --git a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java index ad5216d..241d902 100644 --- a/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java +++ b/sdks/java/extensions/ml/src/main/java/org/apache/beam/sdk/extensions/ml/package-info.java @@ -16,4 +16,7 @@ * limitations under the License. */ /** Provides DoFns for integration with Google Cloud AI Video Intelligence service. */ +@Experimental package org.apache.beam.sdk.extensions.ml; + +import org.apache.beam.sdk.annotations.Experimental; diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java index 57400e4..56473aa 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/AnnotateVideoTest.java @@ -52,22 +52,20 @@ public class AnnotateVideoTest { .thenReturn(Collections.singletonList(VideoAnnotationResults.newBuilder().build())); when(future.get()).thenReturn(response); when(serviceClient.annotateVideoAsync(any())).thenReturn(future); - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.videoIntelligenceServiceClient = serviceClient; + annotateVideoFromBytesFn.videoIntelligenceServiceClient = serviceClient; List<VideoAnnotationResults> videoAnnotationResults = - annotateVideoFromBytes.getVideoAnnotationResults(TEST_URI, null, null); + annotateVideoFromBytesFn.getVideoAnnotationResults(TEST_URI, null, null); assertEquals(1, videoAnnotationResults.size()); } @Test(expected = IllegalArgumentException.class) public void shouldThrowErrorWhenBothInputTypesNull() throws ExecutionException, InterruptedException { - VideoIntelligence.AnnotateVideoFromBytes annotateVideoFromBytes = - VideoIntelligence.annotateFromBytes( - null, Collections.singletonList(Feature.LABEL_DETECTION)); - annotateVideoFromBytes.getVideoAnnotationResults(null, null, null); + AnnotateVideoFromBytesFn annotateVideoFromBytesFn = + new AnnotateVideoFromBytesFn(null, Collections.singletonList(Feature.LABEL_DETECTION)); + annotateVideoFromBytesFn.getVideoAnnotationResults(null, null, null); } } diff --git a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java index b0b74ee..6427225 100644 --- a/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java +++ b/sdks/java/extensions/ml/src/test/java/org/apache/beam/sdk/extensions/ml/VideoIntelligenceIT.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.extensions.ml; -import static org.apache.beam.sdk.extensions.ml.VideoIntelligence.annotateFromURI; import static org.junit.Assert.assertEquals; import com.google.cloud.videointelligence.v1.Feature; @@ -29,7 +28,6 @@ import java.util.function.Consumer; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; @@ -49,7 +47,7 @@ public class VideoIntelligenceIT { PCollection<List<VideoAnnotationResults>> annotationResults = testPipeline .apply(Create.of(VIDEO_URI)) - .apply("Annotate video", ParDo.of(annotateFromURI(featureList, null))); + .apply("Annotate video", VideoIntelligence.annotateFromURI(featureList, null)); PAssert.that(annotationResults).satisfies(new VerifyVideoAnnotationResult()); testPipeline.run().waitUntilFinish(); }
