Abacn commented on code in PR #25672:
URL: https://github.com/apache/beam/pull/25672#discussion_r1253255757


##########
sdks/java/io/azure-cosmos/build.gradle:
##########
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * License); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+plugins {
+  id("org.apache.beam.module")
+}
+
+applyJavaNature(automaticModuleName: "org.apache.beam.sdk.io.azure.cosmos")
+
+description = "Apache Beam :: SDKs :: Java :: IO :: Azure Cosmos DB"
+ext.summary = "IO library to read and write Azure Cosmos DB"
+
+dependencies {
+  implementation library.java.azure_cosmos
+  permitUnusedDeclared library.java.commons_io // BEAM-11761
+  implementation library.java.guava
+  implementation library.java.jackson_databind
+  // implementation library.java.slf4j_api
+  implementation project(path: ":sdks:java:core", configuration: "shadow")
+  implementation project(path: ":sdks:java:io:azure")
+
+  testImplementation library.java.junit
+  testImplementation library.java.testcontainers_azure
+  testImplementation project(path: ":sdks:java:core", configuration: 
"shadowTest")
+//  testImplementation library.java.mockito_core

Review Comment:
   remove these commented out lines?



##########
CHANGES.md:
##########
@@ -60,6 +60,7 @@
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
 * Support for Bigtable Change Streams added in Java 
`BigtableIO.ReadChangeStream` 
([#27183](https://github.com/apache/beam/issues/27183))
+* Support for read from Cosmos DB Core SQL API 
[#23604](https://github.com/apache/beam/issues/23604)

Review Comment:
   this now will be in 2.50.0



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   (optional) If possible, do not suppress nullness warning in newly added code



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;

Review Comment:
   beam does not use wild card import: 
https://github.com/apache/beam/blob/619e41c7c0a64275dea7f02311e45a92637f43d8/sdks/java/build-tools/src/main/resources/beam/checkstyle.xml#L82



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosIO.java:
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
+import static 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
+
+import com.azure.cosmos.*;
+import com.azure.cosmos.implementation.*;
+import com.azure.cosmos.implementation.feedranges.FeedRangeInternal;
+import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
+import com.azure.cosmos.models.CosmosQueryRequestOptions;
+import com.azure.cosmos.models.FeedResponse;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.google.auto.value.AutoValue;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.io.BoundedSource;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import reactor.core.publisher.Mono;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)
+})
+public class CosmosIO {
+
+  private CosmosIO() {}
+
+  private static final String DEFAULT_QUERY = "SELECT * FROM root";
+
+  /** Provide a {@link Read} {@link PTransform} to read data from a Cosmos DB. 
*/
+  public static <T> Read<T> read(Class<T> classType) {
+    return new 
AutoValue_CosmosIO_Read.Builder<T>().setClassType(classType).build();
+  }
+
+  @AutoValue
+  @AutoValue.CopyAnnotations
+  @SuppressWarnings({"rawtypes"})
+  public abstract static class Read<T> extends PTransform<PBegin, 
PCollection<T>> {
+
+    abstract @Nullable Class<T> getClassType();
+
+    abstract @Nullable String getDatabase();
+
+    abstract @Nullable String getContainer();
+
+    abstract @Nullable String getQuery();
+
+    abstract @Nullable Coder<T> getCoder();
+
+    abstract Builder<T> builder();
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setClassType(Class<T> classType);
+
+      abstract Builder<T> setDatabase(String database);
+
+      abstract Builder<T> setContainer(String container);
+
+      abstract Builder<T> setQuery(String query);
+
+      abstract Builder<T> setCoder(Coder<T> coder);
+
+      abstract Read<T> build();
+    }
+
+    /** Specify the Cosmos database to read from. */
+    public Read<T> withDatabase(String database) {
+      checkArgument(database != null, "database can not be null");
+      checkArgument(!database.isEmpty(), "database can not be empty");
+      return builder().setDatabase(database).build();
+    }
+
+    /** Specify the Cosmos container to read from. */
+    public Read<T> withContainer(String container) {
+      checkArgument(container != null, "container can not be null");
+      checkArgument(!container.isEmpty(), "container can not be empty");
+      return builder().setContainer(container).build();
+    }
+
+    /** Specify the query to read data. */
+    public Read<T> withQuery(String query) {
+      return builder().setQuery(query).build();
+    }
+
+    /** Specify the {@link Coder} used to serialize the document in the {@link 
PCollection}. */
+    public Read<T> withCoder(Coder<T> coder) {
+      checkArgument(coder != null, "coder can not be null");
+      return builder().setCoder(coder).build();
+    }
+
+    @Override
+    public PCollection<T> expand(PBegin input) {
+      checkState(getDatabase() != null, "withDatabase() is required");
+      checkState(getContainer() != null, "withContainer() is required");
+      checkState(getCoder() != null, "withCoder() is required");
+      return input.apply(org.apache.beam.sdk.io.Read.from(new 
BoundedCosmosBDSource<>(this)));
+    }
+  }
+
+  /** A {@link BoundedSource} reading from Comos. */
+  @VisibleForTesting
+  public static class BoundedCosmosBDSource<T> extends BoundedSource<T> {
+
+    private final Read<T> spec;
+    private final NormalizedRange range;
+
+    private @Nullable Long estimatedByteSize;
+
+    BoundedCosmosBDSource(Read<T> spec) {
+      this(spec, NormalizedRange.FULL_RANGE, null);
+    }
+
+    BoundedCosmosBDSource(Read<T> spec, NormalizedRange range, Long 
estimatedSize) {
+      this.spec = spec;
+      this.range = range;
+      this.estimatedByteSize = estimatedSize;
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> split(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception 
{
+      CosmosClientBuilder builder = 
options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncDatabase database = client.getDatabase(spec.getDatabase());
+        CosmosAsyncContainer container = 
database.getContainer(spec.getContainer());
+        AsyncDocumentClient document = 
CosmosBridgeInternal.getAsyncDocumentClient(client);
+
+        List<BoundedCosmosBDSource<T>> sources = new ArrayList<>();
+        long rangeSize = getEstimatedSizeBytes(options);
+        float splitsFloat = (float) rangeSize / desiredBundleSizeBytes;
+        int splits = (int) Math.ceil(splitsFloat);
+
+        // match internal impl of CosmosAsyncContainer trySplitFeedRange
+        String databaseLink =
+            
ImplementationBridgeHelpers.CosmosAsyncDatabaseHelper.getCosmosAsyncDatabaseAccessor()
+                .getLink(database);
+        String containerLink =
+            databaseLink + "/" + Paths.COLLECTIONS_PATH_SEGMENT + "/" + 
container.getId();
+        Mono<Utils.ValueHolder<DocumentCollection>> getCollectionObservable =
+            document
+                .getCollectionCache()
+                .resolveByNameAsync(null, containerLink, null)
+                .map(Utils.ValueHolder::initialize);
+
+        List<NormalizedRange> subRanges =
+            FeedRangeInternal.convert(range.toFeedRange())
+                .trySplit(
+                    document.getPartitionKeyRangeCache(), null, 
getCollectionObservable, splits)
+                .block().stream()
+                .map(NormalizedRange::fromFeedRange)
+                .collect(Collectors.toList());
+
+        long estimatedSubRangeSize = rangeSize / subRanges.size();
+        for (NormalizedRange subrange : subRanges) {
+          sources.add(new BoundedCosmosBDSource<>(spec, subrange, 
estimatedSubRangeSize));
+        }
+
+        return sources;
+      }
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      if (estimatedByteSize != null) {
+        return estimatedByteSize;
+      }
+      CosmosClientBuilder builder = 
options.as(CosmosOptions.class).getCosmosClientBuilder();
+      try (CosmosAsyncClient client = builder.buildAsyncClient()) {
+        CosmosAsyncContainer container =
+            
client.getDatabase(spec.getDatabase()).getContainer(spec.getContainer());
+
+        CosmosChangeFeedRequestOptions requestOptions =
+            
CosmosChangeFeedRequestOptions.createForProcessingFromNow(range.toFeedRange());
+        requestOptions.setMaxItemCount(1);
+        requestOptions.setMaxPrefetchPageCount(1);
+        requestOptions.setQuotaInfoEnabled(true);
+
+        estimatedByteSize =
+            container
+                .queryChangeFeed(requestOptions, ObjectNode.class)
+                .byPage()
+                .take(1)
+                .map(FeedResponse::getDocumentUsage)
+                .map(kb -> kb * 1000)

Review Comment:
   just to confirm, if it is `kb*1000` or `kb*1024`; and possible add a comment 
for number literal used in code



##########
sdks/java/io/azure-cosmos/src/main/java/org/apache/beam/sdk/io/azure/cosmos/CosmosOptions.java:
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.azure.cosmos;
+
+import com.azure.cosmos.CosmosClientBuilder;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.io.azure.options.AzureOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.DefaultValueFactory;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+@Experimental(Experimental.Kind.SOURCE_SINK)
+@SuppressWarnings({
+  "nullness" // TODO(https://github.com/apache/beam/issues/20497)

Review Comment:
   same here,  If possible, do not suppress nullness warning in new code



##########
sdks/java/io/azure-cosmos/README.md:
##########
@@ -0,0 +1,65 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Cosmos DB Core SQL API
+
+Compile all module azure-cosmos
+
+```shell
+gradle sdks:java:io:azure-cosmos:build
+```
+
+Valite code:

Review Comment:
   validate



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to