kbendick commented on a change in pull request #3663:
URL: https://github.com/apache/iceberg/pull/3663#discussion_r762373289
##########
File path:
spark/v2.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkDataWrite.java
##########
@@ -189,6 +192,56 @@ public void testAppend() throws IOException {
Assert.assertEquals("Result rows should match", expected, actual);
}
+ @Test
+ public void testConcurrentAppend() throws IOException {
+ File parent = temp.newFolder(format.toString());
+ File location = new File(parent, "test");
+
+ HadoopTables tables = new HadoopTables(CONF);
+ Map<String, String> properties = new HashMap<>();
+ properties.put(TableProperties.COMMIT_NUM_RETRIES, "1000");
+ Table table = tables.create(SCHEMA, null, properties, location.toString());
+
+ List<SimpleRecord> records = Lists.newArrayList(
+ new SimpleRecord(1, "a"),
+ new SimpleRecord(2, "b"),
+ new SimpleRecord(3, "c")
+ );
+
+ Dataset<Row> df = spark.createDataFrame(records, SimpleRecord.class);
+ int threadsCount = 10;
Review comment:
Nit: Same thing here about lowering the thread count.
##########
File path: core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
##########
@@ -419,4 +422,33 @@ public void testCanReadOldCompressedManifestFiles() throws
Exception {
List<FileScanTask> tasks =
Lists.newArrayList(reloaded.newScan().planFiles());
Assert.assertEquals("Should scan 1 files", 1, tasks.size());
}
+
+ @Test
+ public void testConcurrentAppend() throws Exception {
+ assertTrue("Should create v1 metadata",
+ version(1).exists() && version(1).isFile());
+ File dir = temp.newFolder();
+ dir.delete();
+
+ Table tableWithHighRetries = TABLES.create(SCHEMA, SPEC, new
HashMap<String, String>() {
+ {
+ put(TableProperties.COMMIT_NUM_RETRIES, "1000");
+ }
+ }, dir.toURI().toString());
+ int threadsCount = 30;
+ Thread[] threads = new Thread[threadsCount];
+ for (int i = 0; i < threadsCount; i++) {
+ threads[i] = new Thread(() ->
tableWithHighRetries.newAppend().appendFile(FILE_A).commit());
+ threads[i].start();
+ }
+ Arrays.stream(threads).forEach(t -> {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ // intentionally swallow to check result later
+ }
+ });
+ tableWithHighRetries.refresh();
+ assertEquals(threadsCount,
Lists.newArrayList(tableWithHighRetries.snapshots()).size());
Review comment:
Is there no danger here of having a potentially flakey test? I see that
the commit number of retries is set to 1000.
But our CI environment can get very busy. I worry that having to set the
retries to 1000 will cause the amount of time CI takes to increase by quite a
bit. We do share test resources with the entirety of the ASF after all.
##########
File path: core/src/test/java/org/apache/iceberg/hadoop/TestHadoopCommits.java
##########
@@ -419,4 +422,33 @@ public void testCanReadOldCompressedManifestFiles() throws
Exception {
List<FileScanTask> tasks =
Lists.newArrayList(reloaded.newScan().planFiles());
Assert.assertEquals("Should scan 1 files", 1, tasks.size());
}
+
+ @Test
+ public void testConcurrentAppend() throws Exception {
+ assertTrue("Should create v1 metadata",
+ version(1).exists() && version(1).isFile());
+ File dir = temp.newFolder();
+ dir.delete();
+
+ Table tableWithHighRetries = TABLES.create(SCHEMA, SPEC, new
HashMap<String, String>() {
+ {
+ put(TableProperties.COMMIT_NUM_RETRIES, "1000");
+ }
+ }, dir.toURI().toString());
+ int threadsCount = 30;
Review comment:
Nit / Somewhat blocking: Can we consider lowering this value? CI already
takes some time, and I feel like spawning 30 threads in a test in our CI
environment, where each of those threads runs a spark job, might be a little
heavy given the very large number of concurrent users as well as the limited
testing resources we get from github.
Would 4 or 5 threads be ok for all of the tests? Spawning 30 OS threads in
one single test is likely to cause a large degree of contention.
What about if we changed threadsCount to 4 or 5? Would that still be ok?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]