rdblue commented on a change in pull request #2213:
URL: https://github.com/apache/iceberg/pull/2213#discussion_r571340161
##########
File path:
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMerge.java
##########
@@ -50,6 +74,361 @@ public void removeTables() {
// TODO: add tests for multiple NOT MATCHED clauses when we move to Spark 3.1
+ @Test
+ public void testSelfMerge() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ sql("MERGE INTO %s t USING %s s " +
+ "ON t.id == s.id " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET v = 'x' " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT *", tableName, tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "x"), // updated
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s
ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithSourceAsSelfSubquery() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source", Arrays.asList(1, null), Encoders.INT());
+
+ sql("MERGE INTO %s t USING (SELECT id AS value FROM %s r JOIN source ON
r.id = source.value) s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED AND t.id = 1 THEN " +
+ " UPDATE SET v = 'x' " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES ('invalid', -1) ", tableName, tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "x"), // updated
+ row(2, "v2") // kept
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s
ORDER BY id", tableName));
+ }
+
+ @Test
+ public synchronized void testMergeWithSerializableIsolation() throws
InterruptedException {
+ // cannot run tests with concurrency for Hadoop tables without atomic
renames
+ Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+ createAndInitTable("id INT, dep STRING");
+ createOrReplaceView("source", Collections.singletonList(1),
Encoders.INT());
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName,
MERGE_ISOLATION_LEVEL, "serializable");
+
+ ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+
+ // merge thread
+ Future<?> mergeFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE;
numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("MERGE INTO %s t USING source s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET dep = 'x'", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < Integer.MAX_VALUE;
numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ mergeFuture.get();
+ Assert.fail("Expected a validation exception");
+ } catch (ExecutionException e) {
+ Throwable sparkException = e.getCause();
+ Assert.assertThat(sparkException,
CoreMatchers.instanceOf(SparkException.class));
+ Throwable validationException = sparkException.getCause();
+ Assert.assertThat(validationException,
CoreMatchers.instanceOf(ValidationException.class));
+ String errMsg = validationException.getMessage();
+ Assert.assertThat(errMsg, CoreMatchers.containsString("Found conflicting
files that can contain"));
+ } finally {
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2,
TimeUnit.MINUTES));
+ }
+
+ @Test
+ public synchronized void testMergeWithSnapshotIsolation() throws
InterruptedException, ExecutionException {
+ // cannot run tests with concurrency for Hadoop tables without atomic
renames
+ Assume.assumeFalse(catalogName.equalsIgnoreCase("testhadoop"));
+
+ createAndInitTable("id INT, dep STRING");
+ createOrReplaceView("source", Collections.singletonList(1),
Encoders.INT());
+
+ sql("ALTER TABLE %s SET TBLPROPERTIES('%s' '%s')", tableName,
MERGE_ISOLATION_LEVEL, "snapshot");
+
+ ExecutorService executorService = MoreExecutors.getExitingExecutorService(
+ (ThreadPoolExecutor) Executors.newFixedThreadPool(2));
+
+ AtomicInteger barrier = new AtomicInteger(0);
+
+ // merge thread
+ Future<?> mergeFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < 20; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("MERGE INTO %s t USING source s " +
+ "ON t.id == s.value " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET dep = 'x'", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ // append thread
+ Future<?> appendFuture = executorService.submit(() -> {
+ for (int numOperations = 0; numOperations < 20; numOperations++) {
+ while (barrier.get() < numOperations * 2) {
+ sleep(10);
+ }
+ sql("INSERT INTO TABLE %s VALUES (1, 'hr')", tableName);
+ barrier.incrementAndGet();
+ }
+ });
+
+ try {
+ mergeFuture.get();
+ } finally {
+ appendFuture.cancel(true);
+ }
+
+ executorService.shutdown();
+ Assert.assertTrue("Timeout", executorService.awaitTermination(2,
TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void testMergeWithExtraColumnsInSource() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": 1, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+ createOrReplaceView("source",
+ "{ \"id\": 1, \"extra_col\": -1, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 3, \"extra_col\": -1, \"v\": \"v3\" }\n" +
+ "{ \"id\": 4, \"extra_col\": -1, \"v\": \"v4\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
+ "WHEN MATCHED THEN " +
+ " UPDATE SET v = source.v " +
+ "WHEN NOT MATCHED THEN " +
+ " INSERT (v, id) VALUES (source.v, source.id)", tableName);
+
+ ImmutableList<Object[]> expectedRows = ImmutableList.of(
+ row(1, "v1_1"), // new
+ row(2, "v2"), // kept
+ row(3, "v3"), // new
+ row(4, "v4") // new
+ );
+ assertEquals("Output should match", expectedRows, sql("SELECT * FROM %s
ORDER BY id", tableName));
+ }
+
+ @Test
+ public void testMergeWithNullsInTargetAndSource() {
+ createAndInitTable("id INT, v STRING",
+ "{ \"id\": null, \"v\": \"v1\" }\n" +
+ "{ \"id\": 2, \"v\": \"v2\" }");
+
+ createOrReplaceView("source",
+ "{ \"id\": null, \"v\": \"v1_1\" }\n" +
+ "{ \"id\": 4, \"v\": \"v4\" }");
+
+ sql("MERGE INTO %s t USING source " +
+ "ON t.id == source.id " +
Review comment:
I didn't know you could use `==`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]