kennknowles commented on code in PR #31220:
URL: https://github.com/apache/beam/pull/31220#discussion_r1594815933


##########
.github/workflows/beam_PreCommit_Java_Iceberg_IO_Direct.yml:
##########
@@ -13,21 +13,27 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-name: IcebergIO Unit Tests
+name: PreCommit Java IcebergIO Direct

Review Comment:
   nit: I really dislike this pattern of naming, and did not copy it, because:
   
    - "PreCommit" actually also runs in postcommit so this is actively 
misleading and should not be included. (and PostCommit also runs in precommit 
on request so it too is unuseful FWIW)
    - "Java" doesn't add any information. IcebergIO is only in Java.
    - "Direct" is intended for unit testing transforms, so it is redundant. 
Also new contributors have no context for what this abbreviated name means. It 
contains just as much information to just not have it at all.
   
   I'm OK with your change; I see why you would do it to make it consistent 
with the others. I just deliberately made this one better than the others. 
Trying to take tiny steps forward, because the hassle of doing global renames 
is too much sometimes.
   
   But while it may seem to add value to have consistency, I feel the opposite: 
it encourages people to build fragile stuff based on regex matching. If we can 
do to make that mistake less likely, it is a good thing.
   
   In summary, the only word in the whole name that really carries information 
is "IcebergIO" so a good name might be "IcebergIO Tests". It could be useful to 
say what kind of tests, like "IcebergIO Unit Tests and Small ITs"
   
   Just my $0.02 that you can take or leave.



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/SchemaAndRowConversions.java:
##########
@@ -20,22 +20,32 @@
 import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull;
 
 import java.nio.ByteBuffer;
+import java.util.Map;
 import java.util.Optional;
 import java.util.UUID;
 import org.apache.beam.sdk.schemas.Schema;
 import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.iceberg.data.GenericRecord;
 import org.apache.iceberg.data.Record;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.joda.time.DateTime;
 import org.joda.time.DateTimeZone;
 
 class SchemaAndRowConversions {
 
-  private SchemaAndRowConversions() {}

Review Comment:
   this private constructor is a design pattern for classes which are not to be 
instantiated, but exist just as a container for static members.



##########
.github/workflows/beam_PreCommit_Java_Iceberg_IO_Direct.yml:
##########
@@ -88,7 +94,7 @@ jobs:
       - name: run IcebergIO build script
         uses: ./.github/actions/gradle-command-self-hosted-action
         with:
-          gradle-command: :sdks:java:io:iceberg:build
+          gradle-command: :sdks:java:io:iceberg:build 
:sdks:java:io:iceberg:integrationTest

Review Comment:
   So, this works for now. But when you add tests that are too big for the 
direct runner, I think you need another job. And i presume you would then pass 
in pipeline options that control the data volume so that you could re-use 
existing tests at higher scale.



##########
sdks/java/io/iceberg/src/test/java/org/apache/beam/sdk/io/iceberg/IcebergIOIT.java:
##########
@@ -0,0 +1,266 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.CatalogUtil;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.data.parquet.GenericParquetWriter;
+import org.apache.iceberg.encryption.InputFilesDecryptor;
+import org.apache.iceberg.hadoop.HadoopCatalog;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.DataWriter;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.io.OutputFile;
+import org.apache.iceberg.parquet.Parquet;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class IcebergIOIT implements Serializable {
+
+  public interface IcebergIOTestPipelineOptions extends GcpOptions {
+    @Description("Number of records that will be written and/or read by the 
test")
+    @Default.Integer(1000) // deliberately small so no-args execution is quick
+    Integer getNumRecords();
+
+    void setNumRecords(Integer numRecords);
+
+    @Description("Number of shards in the test table")
+    @Default.Integer(10)
+    Integer getNumShards();
+
+    void setNumShards(Integer numShards);
+  }
+
+  @Rule public TestPipeline writePipeline = TestPipeline.create();
+
+  @Rule public TestPipeline readPipeline = TestPipeline.create();
+
+  static IcebergIOTestPipelineOptions options;
+
+  static Configuration catalogHadoopConf;
+
+  @BeforeClass
+  public static void beforeClass() {
+    PipelineOptionsFactory.register(IcebergIOTestPipelineOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(IcebergIOTestPipelineOptions.class);
+
+    catalogHadoopConf = new Configuration();
+    catalogHadoopConf.set("fs.gs.project.id", options.getProject());
+    catalogHadoopConf.set("fs.gs.auth.type", "SERVICE_ACCOUNT_JSON_KEYFILE");
+    catalogHadoopConf.set(
+        "fs.gs.auth.service.account.json.keyfile", 
System.getenv("GOOGLE_APPLICATION_CREDENTIALS"));
+  }
+
+  static final org.apache.beam.sdk.schemas.Schema BEAM_SCHEMA =
+      org.apache.beam.sdk.schemas.Schema.builder()
+          .addInt32Field("int")
+          .addFloatField("float")
+          .addDoubleField("double")
+          .addInt64Field("long")
+          .addStringField("str")
+          .addBooleanField("bool")
+          .addByteArrayField("bytes")
+          .build();
+
+  static final Schema ICEBERG_SCHEMA =
+      SchemaAndRowConversions.beamSchemaToIcebergSchema(BEAM_SCHEMA);
+
+  Map<String, Object> getValues(int num) {
+    String strNum = Integer.toString(num);
+    return ImmutableMap.<String, Object>builder()
+        .put("int", num)
+        .put("float", Float.valueOf(strNum))
+        .put("double", Double.valueOf(strNum))
+        .put("long", Long.valueOf(strNum))
+        .put("str", strNum)
+        .put("bool", num % 2 == 0)
+        .put("bytes", ByteBuffer.wrap(new byte[] {(byte) num}))
+        .build();
+  }
+
+  /**
+   * Populates the Iceberg table according to the configuration specified in 
{@link
+   * IcebergIOTestPipelineOptions}. Returns a {@link List<Row>} of expected 
elements.
+   */
+  List<Row> populateTable(Table table) throws IOException {

Review Comment:
   Problem here is that this method is not scale-independent. You can't run the 
`integrationTest` job on e.g. Dataflow with pipeline options that scale up too 
big for direct runner. What you have done is definitely useful as-is. But what 
we also really need are tests where we don't rely on being able to hold the 
expected results in memory. So you need to calculate a signature/checksum of 
some sort and test against that when you move to a scale-independent test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to