http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java ---------------------------------------------------------------------- diff --git a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java index 2273e06..d2c89e5 100644 --- a/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java +++ b/hcatalog/streaming/src/test/org/apache/hive/hcatalog/streaming/mutate/worker/TestMutatorImpl.java @@ -49,7 +49,7 @@ public class TestMutatorImpl { private static final int RECORD_ID_COLUMN = 2; private static final int BUCKET_ID = 0; private static final Path PATH = new Path("X"); - private static final long TRANSACTION_ID = 1L; + private static final long WRITE_ID = 1L; @Mock private AcidOutputFormat<?, ?> mockOutputFormat; @@ -67,7 +67,7 @@ public class TestMutatorImpl { @Before public void injectMocks() throws IOException { when(mockOutputFormat.getRecordUpdater(eq(PATH), any(Options.class))).thenReturn(mockRecordUpdater); - mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, TRANSACTION_ID, + mutator = new MutatorImpl(configuration, RECORD_ID_COLUMN, mockObjectInspector, mockOutputFormat, WRITE_ID, PATH, BUCKET_ID); } @@ -79,26 +79,26 @@ public class TestMutatorImpl { assertThat(options.getConfiguration(), is((Configuration) configuration)); assertThat(options.getInspector(), is(mockObjectInspector)); assertThat(options.getRecordIdColumn(), is(RECORD_ID_COLUMN)); - assertThat(options.getMinimumTransactionId(), is(TRANSACTION_ID)); - assertThat(options.getMaximumTransactionId(), is(TRANSACTION_ID)); + assertThat(options.getMinimumWriteId(), is(WRITE_ID)); + assertThat(options.getMaximumWriteId(), is(WRITE_ID)); } @Test public void testInsertDelegates() throws IOException { mutator.insert(RECORD); - verify(mockRecordUpdater).insert(TRANSACTION_ID, RECORD); + verify(mockRecordUpdater).insert(WRITE_ID, RECORD); } @Test public void testUpdateDelegates() throws IOException { mutator.update(RECORD); - verify(mockRecordUpdater).update(TRANSACTION_ID, RECORD); + verify(mockRecordUpdater).update(WRITE_ID, RECORD); } @Test public void testDeleteDelegates() throws IOException { mutator.delete(RECORD); - verify(mockRecordUpdater).delete(TRANSACTION_ID, RECORD); + verify(mockRecordUpdater).delete(WRITE_ID, RECORD); } @Test
http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java index 7967a24..53ae2c0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java @@ -244,7 +244,7 @@ public class TestAcidOnTez { {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":0}\t1\t2", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":1}\t3\t4", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "1/000000_0"}, {"{\"transactionid\":0,\"bucketid\":536870912,\"rowid\":3}\t9\t10", AbstractFileMergeOperator.UNION_SUDBIR_PREFIX + "2/000000_0"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000021_0000021_0000/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000001_0000001_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -256,7 +256,7 @@ public class TestAcidOnTez { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000"}; + String[] expectedDelDelta = {"delete_delta_0000001_0000001_0000", "delete_delta_0000002_0000002_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -285,7 +285,7 @@ public class TestAcidOnTez { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.NONACIDNONBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = {"delete_delta_0000021_0000021_0000", "delete_delta_0000022_0000022_0000", "delete_delta_0000021_0000022"}; + String[] expectedDelDelta2 = {"delete_delta_0000001_0000001_0000", "delete_delta_0000002_0000002_0000", "delete_delta_0000001_0000002"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -309,7 +309,7 @@ public class TestAcidOnTez { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000022/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000002/bucket_00000")); } } /** @@ -453,12 +453,12 @@ public class TestAcidOnTez { /* * Expected result 0th entry is the RecordIdentifier + data. 1st entry file before compact*/ String expected[][] = { - {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0001/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000018_0000018_0002/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000018_0000018_0002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":2}\t5\t6", "/delta_0000001_0000001_0002/bucket_00000"}, }; Assert.assertEquals("Unexpected row count after ctas", expected.length, rs.size()); //verify data and layout @@ -475,10 +475,10 @@ public class TestAcidOnTez { LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000018_0000018_0001/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000018_0000018_0001/bucket_00000"}, - {"{\"transactionid\":18,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000018_0000018_0002/bucket_00000"}, - {"{\"transactionid\":20,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "delta_0000020_0000020_0000/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t9\t10", "/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"transactionid\":2,\"bucketid\":536870912,\"rowid\":0}\t70\t80", "/delta_0000002_0000002_0000/bucket_00000"} }; Assert.assertEquals("Unexpected row count after update", expected2.length, rs.size()); //verify data and layout @@ -490,7 +490,7 @@ public class TestAcidOnTez { FileSystem fs = FileSystem.get(hiveConf); FileStatus[] status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta = {"delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000"}; + String[] expectedDelDelta = {"delete_delta_0000002_0000002_0000", "delete_delta_0000003_0000003_0000"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta.length; i++) { if(expectedDelDelta[i] != null && stat.getPath().toString().endsWith(expectedDelDelta[i])) { @@ -519,7 +519,7 @@ public class TestAcidOnTez { //check we have right delete delta files after minor compaction status = fs.listStatus(new Path(TEST_WAREHOUSE_DIR + "/" + (Table.ACIDNOBUCKET).toString().toLowerCase()), FileUtils.STAGING_DIR_PATH_FILTER); - String[] expectedDelDelta2 = { "delete_delta_0000020_0000020_0000", "delete_delta_0000021_0000021_0000", "delete_delta_0000018_0000021"}; + String[] expectedDelDelta2 = { "delete_delta_0000002_0000002_0000", "delete_delta_0000003_0000003_0000", "delete_delta_0000001_0000003"}; for(FileStatus stat : status) { for(int i = 0; i < expectedDelDelta2.length; i++) { if(expectedDelDelta2[i] != null && stat.getPath().toString().endsWith(expectedDelDelta2[i])) { @@ -543,7 +543,7 @@ public class TestAcidOnTez { for(int i = 0; i < expected2.length; i++) { Assert.assertTrue("Actual line " + i + " bc: " + rs.get(i), rs.get(i).startsWith(expected2[i][0])); //everything is now in base/ - Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000021/bucket_00000")); + Assert.assertTrue("Actual line(file) " + i + " bc: " + rs.get(i), rs.get(i).endsWith("base_0000003/bucket_00000")); } } /** @@ -638,17 +638,17 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h âââ HIVE_UNION_SUBDIR_1 â  âââ 000000_0 â  âââ _orc_acid_version - â  âââ delta_0000019_0000019_0001 + â  âââ delta_0000001_0000001_0001 â  âââ bucket_00000 âââ HIVE_UNION_SUBDIR_2 â  âââ 000000_0 â  âââ _orc_acid_version - â  âââ delta_0000019_0000019_0002 + â  âââ delta_0000001_0000001_0002 â  âââ bucket_00000 âââ HIVE_UNION_SUBDIR_3 âââ 000000_0 âââ _orc_acid_version - âââ delta_0000019_0000019_0003 + âââ delta_0000001_0000001_0003 âââ bucket_00000 10 directories, 6 files */ @@ -660,11 +660,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h } String[][] expected2 = { - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000019_0000019_0001/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000019_0000019_0002/bucket_00000"}, - {"{\"transactionid\":19,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000019_0000019_0003/bucket_00000"} + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870913,\"rowid\":1}\t3\t4", "warehouse/t/delta_0000001_0000001_0001/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":0}\t5\t6", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870914,\"rowid\":1}\t7\t8", "warehouse/t/delta_0000001_0000001_0002/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536870915,\"rowid\":0}\t9\t10", "warehouse/t/delta_0000001_0000001_0003/bucket_00000"} }; Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { @@ -688,11 +688,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h âââ -ext-10000 âââ 000000_0 â  âââ _orc_acid_version - â  âââ delta_0000021_0000021_0000 + â  âââ delta_0000001_0000001_0000 â  âââ bucket_00000 âââ 000001_0 âââ _orc_acid_version - âââ delta_0000021_0000021_0000 + âââ delta_0000001_0000001_0000 âââ bucket_00001 5 directories, 4 files @@ -705,11 +705,11 @@ ekoifman:apache-hive-3.0.0-SNAPSHOT-bin ekoifman$ tree ~/dev/hiverwgit/itests/h LOG.warn(s); } String[][] expected2 = { - {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"}, - {"{\"transactionid\":21,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000021_0000021_0000/bucket_00000"}, - {"{\"transactionid\":21,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000021_0000021_0000/bucket_00001"} + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":0}\t1\t2", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t4", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":2}\t5\t6", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"transactionid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t8", "warehouse/t/delta_0000001_0000001_0000/bucket_00000"}, + {"{\"transactionid\":1,\"bucketid\":536936448,\"rowid\":1}\t9\t10", "warehouse/t/delta_0000001_0000001_0000/bucket_00001"} }; Assert.assertEquals("Unexpected row count", expected2.length, rs.size()); for(int i = 0; i < expected2.length; i++) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java index 6dd7305..0410fb0 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactor.java @@ -36,7 +36,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.cli.CliSessionState; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -661,17 +661,18 @@ public class TestCompactor { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000003_0000006")) { + if (names[i].equals("delta_0000001_0000004")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000003_0000004", - "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"}; + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); + checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, + 0, 1L, 4L, 1); } finally { connection.close(); @@ -721,11 +722,11 @@ public class TestCompactor { FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); } finally { connection.close(); } @@ -781,17 +782,17 @@ public class TestCompactor { Path resultDelta = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000003_0000006")) { + if (names[i].equals("delta_0000001_0000004")) { resultDelta = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000003_0000004", - "delta_0000003_0000006", "delta_0000005_0000006"}; + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); + checkExpectedTxnsPresent(null, new Path[]{resultDelta}, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); } finally { connection.close(); } @@ -847,13 +848,13 @@ public class TestCompactor { Assert.fail("majorCompactAfterAbort FileStatus[] stat " + Arrays.toString(stat)); } if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - if (!name.equals("base_0000006")) { - Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000006"); + if (!name.equals("base_0000004")) { + Assert.fail("majorCompactAfterAbort name " + name + " not equals to base_0000004"); } - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 1); } finally { connection.close(); } @@ -902,11 +903,11 @@ public class TestCompactor { FileStatus[] stat = fs.listStatus(new Path(table.getSd().getLocation()), AcidUtils.baseFileFilter); if (1 != stat.length) { - Assert.fail("Expecting 1 file \"base_0000006\" and found " + stat.length + " files " + Arrays.toString(stat)); + Assert.fail("Expecting 1 file \"base_0000004\" and found " + stat.length + " files " + Arrays.toString(stat)); } String name = stat[0].getPath().getName(); - Assert.assertEquals(name, "base_0000006"); - checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 2); + Assert.assertEquals(name, "base_0000004"); + checkExpectedTxnsPresent(stat[0].getPath(), null, columnNamesProperty, columnTypesProperty, 0, 1L, 4L, 2); } finally { connection.close(); } @@ -960,16 +961,17 @@ public class TestCompactor { Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000003_0000005")) { + if (deltas[i].equals("delta_0000001_0000003")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000005", "delta_0000004_0000004_0000"}; + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000003", "delta_0000002_0000002_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, + 0, 1L, 2L, 1); // Verify that we have got correct set of delete_deltas. FileStatus[] deleteDeltaStat = @@ -978,16 +980,17 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000003_0000005")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000003")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000005", "delete_delta_0000005_0000005_0000"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000003", "delete_delta_0000003_0000003_0000"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, 0, 4L, 4L, 1); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDeleteDelta}, columnNamesProperty, columnTypesProperty, + 0, 2L, 2L, 1); } @Test @@ -1037,16 +1040,17 @@ public class TestCompactor { Path minorCompactedDelta = null; for (int i = 0; i < deltas.length; i++) { deltas[i] = stat[i].getPath().getName(); - if (deltas[i].equals("delta_0000003_0000004")) { + if (deltas[i].equals("delta_0000001_0000002")) { minorCompactedDelta = stat[i].getPath(); } } Arrays.sort(deltas); - String[] expectedDeltas = new String[]{"delta_0000003_0000003_0000", "delta_0000003_0000004", "delta_0000004_0000004_0000"}; + String[] expectedDeltas = new String[]{"delta_0000001_0000001_0000", "delta_0000001_0000002", "delta_0000002_0000002_0000"}; if (!Arrays.deepEquals(expectedDeltas, deltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeltas) + ", found: " + Arrays.toString(deltas)); } - checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, 0, 3L, 4L, 1); + checkExpectedTxnsPresent(null, new Path[]{minorCompactedDelta}, columnNamesProperty, columnTypesProperty, + 0, 1L, 2L, 1); // Verify that we have got correct set of delete_deltas. FileStatus[] deleteDeltaStat = @@ -1055,12 +1059,12 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000003_0000004")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000002")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000004"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000002"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1114,17 +1118,18 @@ public class TestCompactor { Path resultFile = null; for (int i = 0; i < names.length; i++) { names[i] = stat[i].getPath().getName(); - if (names[i].equals("delta_0000003_0000006")) { + if (names[i].equals("delta_0000001_0000004")) { resultFile = stat[i].getPath(); } } Arrays.sort(names); - String[] expected = new String[]{"delta_0000003_0000004", - "delta_0000003_0000006", "delta_0000005_0000006", "delta_0000007_0000008"}; + String[] expected = new String[]{"delta_0000001_0000002", + "delta_0000001_0000004", "delta_0000003_0000004", "delta_0000005_0000006"}; if (!Arrays.deepEquals(expected, names)) { Assert.fail("Expected: " + Arrays.toString(expected) + ", found: " + Arrays.toString(names)); } - checkExpectedTxnsPresent(null, new Path[]{resultFile},columnNamesProperty, columnTypesProperty, 0, 3L, 6L, 1); + checkExpectedTxnsPresent(null, new Path[]{resultFile}, columnNamesProperty, columnTypesProperty, + 0, 1L, 4L, 1); // Verify that we have got correct set of delete_deltas also FileStatus[] deleteDeltaStat = @@ -1133,12 +1138,12 @@ public class TestCompactor { Path minorCompactedDeleteDelta = null; for (int i = 0; i < deleteDeltas.length; i++) { deleteDeltas[i] = deleteDeltaStat[i].getPath().getName(); - if (deleteDeltas[i].equals("delete_delta_0000003_0000006")) { + if (deleteDeltas[i].equals("delete_delta_0000001_0000004")) { minorCompactedDeleteDelta = deleteDeltaStat[i].getPath(); } } Arrays.sort(deleteDeltas); - String[] expectedDeleteDeltas = new String[]{"delete_delta_0000003_0000006"}; + String[] expectedDeleteDeltas = new String[]{"delete_delta_0000001_0000004"}; if (!Arrays.deepEquals(expectedDeleteDeltas, deleteDeltas)) { Assert.fail("Expected: " + Arrays.toString(expectedDeleteDeltas) + ", found: " + Arrays.toString(deleteDeltas)); } @@ -1330,14 +1335,19 @@ public class TestCompactor { private void checkExpectedTxnsPresent(Path base, Path[] deltas, String columnNamesProperty, String columnTypesProperty, int bucket, long min, long max, int numBuckets) throws IOException { - ValidTxnList txnList = new ValidTxnList() { + ValidWriteIdList writeIdList = new ValidWriteIdList() { @Override - public boolean isTxnValid(long txnid) { + public String getTableName() { + return "AcidTable"; + } + + @Override + public boolean isWriteIdValid(long writeid) { return true; } @Override - public RangeResponse isTxnRangeValid(long minTxnId, long maxTxnId) { + public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) { return RangeResponse.ALL; } @@ -1352,7 +1362,9 @@ public class TestCompactor { } @Override - public Long getMinOpenTxn() { return null; } + public Long getMinOpenWriteId() { + return null; + } @Override public long getHighWatermark() { @@ -1360,7 +1372,7 @@ public class TestCompactor { } @Override - public long[] getInvalidTransactions() { + public long[] getInvalidWriteIds() { return new long[0]; } @Override @@ -1369,12 +1381,12 @@ public class TestCompactor { } @Override - public boolean isTxnAborted(long txnid) { + public boolean isWriteIdAborted(long txnid) { return true; } @Override - public RangeResponse isTxnRangeAborted(long minTxnId, long maxTxnId) { + public RangeResponse isWriteIdRangeAborted(long minWriteId, long maxWriteId) { return RangeResponse.ALL; } }; @@ -1387,18 +1399,18 @@ public class TestCompactor { conf.set(hive_metastoreConstants.BUCKET_COUNT, Integer.toString(numBuckets)); HiveConf.setBoolVar(conf, HiveConf.ConfVars.HIVE_TRANSACTIONAL_TABLE_SCAN, true); AcidInputFormat.RawReader<OrcStruct> reader = - aif.getRawReader(conf, true, bucket, txnList, base, deltas); + aif.getRawReader(conf, true, bucket, writeIdList, base, deltas); RecordIdentifier identifier = reader.createKey(); OrcStruct value = reader.createValue(); long currentTxn = min; boolean seenCurrentTxn = false; while (reader.next(identifier, value)) { if (!seenCurrentTxn) { - Assert.assertEquals(currentTxn, identifier.getTransactionId()); + Assert.assertEquals(currentTxn, identifier.getWriteId()); seenCurrentTxn = true; } - if (currentTxn != identifier.getTransactionId()) { - Assert.assertEquals(currentTxn + 1, identifier.getTransactionId()); + if (currentTxn != identifier.getWriteId()) { + Assert.assertEquals(currentTxn + 1, identifier.getWriteId()); currentTxn++; } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql b/metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql index ae9649c..2c2177b 100644 --- a/metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql +++ b/metastore/scripts/upgrade/derby/044-HIVE-16997.derby.sql @@ -1,2 +1 @@ ALTER TABLE "APP"."PART_COL_STATS" ADD COLUMN "BIT_VECTOR" BLOB; -ALTER TABLE "APP"."TAB_COL_STATS" ADD COLUMN "BIT_VECTOR" BLOB; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql b/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql new file mode 100644 index 0000000..b0bc5b1 --- /dev/null +++ b/metastore/scripts/upgrade/derby/050-HIVE-18192.derby.sql @@ -0,0 +1,27 @@ +-- Add new tables/index for per table write id support +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + +-- Modify txn_components/completed_txn_components tables to add write id. +ALTER TABLE TXN_COMPONENTS ADD TC_WRITEID bigint; +ALTER TABLE COMPLETED_TXN_COMPONENTS ADD CTC_WRITEID bigint; + +-- Modify Compaction related tables to use write id instead of txn id +RENAME COLUMN COMPACTION_QUEUE.CQ_HIGHEST_TXN_ID TO CQ_HIGHEST_WRITE_ID; +RENAME COLUMN COMPLETED_COMPACTIONS.CC_HIGHEST_TXN_ID TO CC_HIGHEST_WRITE_ID; + + http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql index 85d593f..2033bdc 100644 --- a/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/hive-txn-schema-3.0.0.derby.sql @@ -33,7 +33,8 @@ CREATE TABLE TXN_COMPONENTS ( TC_DATABASE varchar(128) NOT NULL, TC_TABLE varchar(128), TC_PARTITION varchar(767), - TC_OPERATION_TYPE char(1) NOT NULL + TC_OPERATION_TYPE char(1) NOT NULL, + TC_WRITEID bigint ); CREATE INDEX TC_TXNID_INDEX ON TXN_COMPONENTS (TC_TXNID); @@ -43,7 +44,8 @@ CREATE TABLE COMPLETED_TXN_COMPONENTS ( CTC_DATABASE varchar(128) NOT NULL, CTC_TABLE varchar(256), CTC_PARTITION varchar(767), - CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL + CTC_TIMESTAMP timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL, + CTC_WRITEID bigint ); CREATE INDEX COMPLETED_TXN_COMPONENTS_IDX ON COMPLETED_TXN_COMPONENTS (CTC_DATABASE, CTC_TABLE, CTC_PARTITION); @@ -53,6 +55,23 @@ CREATE TABLE NEXT_TXN_ID ( ); INSERT INTO NEXT_TXN_ID VALUES(1); +CREATE TABLE TXN_TO_WRITE_ID ( + T2W_TXNID bigint NOT NULL, + T2W_DATABASE varchar(128) NOT NULL, + T2W_TABLE varchar(256) NOT NULL, + T2W_WRITEID bigint NOT NULL +); + +CREATE UNIQUE INDEX TXN_TO_WRITE_ID_IDX ON TXN_TO_WRITE_ID (T2W_DATABASE, T2W_TABLE, T2W_TXNID); + +CREATE TABLE NEXT_WRITE_ID ( + NWI_DATABASE varchar(128) NOT NULL, + NWI_TABLE varchar(256) NOT NULL, + NWI_NEXT bigint NOT NULL +); + +CREATE UNIQUE INDEX NEXT_WRITE_ID_IDX ON NEXT_WRITE_ID (NWI_DATABASE, NWI_TABLE); + CREATE TABLE HIVE_LOCKS ( HL_LOCK_EXT_ID bigint NOT NULL, HL_LOCK_INT_ID bigint NOT NULL, @@ -91,7 +110,7 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_WORKER_ID varchar(128), CQ_START bigint, CQ_RUN_AS varchar(128), - CQ_HIGHEST_TXN_ID bigint, + CQ_HIGHEST_WRITE_ID bigint, CQ_META_INFO varchar(2048) for bit data, CQ_HADOOP_JOB_ID varchar(32) ); @@ -113,7 +132,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_START bigint, CC_END bigint, CC_RUN_AS varchar(128), - CC_HIGHEST_TXN_ID bigint, + CC_HIGHEST_WRITE_ID bigint, CC_META_INFO varchar(2048) for bit data, CC_HADOOP_JOB_ID varchar(32) ); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql ---------------------------------------------------------------------- diff --git a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql index 3a11881..55b89e7 100644 --- a/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql +++ b/metastore/scripts/upgrade/derby/upgrade-2.3.0-to-3.0.0.derby.sql @@ -7,5 +7,6 @@ RUN '045-HIVE-16886.derby.sql'; RUN '046-HIVE-17566.derby.sql'; RUN '048-HIVE-14498.derby.sql'; RUN '049-HIVE-18489.derby.sql'; +RUN '050-HIVE-18192.derby.sql'; UPDATE "APP".VERSION SET SCHEMA_VERSION='3.0.0', VERSION_COMMENT='Hive release version 3.0.0' where VER_ID=1; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d00e639..94999fe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -44,8 +44,9 @@ import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.hive.common.JavaUtils; -import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidTxnWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.metrics.common.Metrics; import org.apache.hadoop.hive.common.metrics.common.MetricsConstant; import org.apache.hadoop.hive.common.metrics.common.MetricsFactory; @@ -64,6 +65,7 @@ import org.apache.hadoop.hive.ql.exec.ConditionalTask; import org.apache.hadoop.hive.ql.exec.DagUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; +import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.exec.TaskFactory; @@ -76,6 +78,7 @@ import org.apache.hadoop.hive.ql.hooks.HookContext; import org.apache.hadoop.hive.ql.hooks.HookUtils; import org.apache.hadoop.hive.ql.hooks.ReadEntity; import org.apache.hadoop.hive.ql.hooks.WriteEntity; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.lockmgr.HiveLock; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -1190,30 +1193,85 @@ public class Driver implements IDriver { return fetchTask; } - // Write the current set of valid transactions into the conf file so that it can be read by - // the input format. + // Write the current set of valid transactions into the conf file private void recordValidTxns(HiveTxnManager txnMgr) throws LockException { - ValidTxnList oldList = null; - String s = conf.get(ValidTxnList.VALID_TXNS_KEY); - if(s != null && s.length() > 0) { - oldList = new ValidReadTxnList(s); - } - ValidTxnList txns = txnMgr.getValidTxns(); - if(oldList != null) { + String oldTxnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + if ((oldTxnString != null) && (oldTxnString.length() > 0)) { throw new IllegalStateException("calling recordValidTxn() more than once in the same " + - JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); } - String txnStr = txns.toString(); + ValidTxnList txnList = txnMgr.getValidTxns(); + String txnStr = txnList.toString(); conf.set(ValidTxnList.VALID_TXNS_KEY, txnStr); - if(plan.getFetchTask() != null) { + LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); + } + + // Write the current set of valid write ids for the operated acid tables into the conf file so + // that it can be read by the input format. + private void recordValidWriteIds(HiveTxnManager txnMgr) throws LockException { + String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY); + if ((txnString == null) || (txnString.isEmpty())) { + throw new IllegalStateException("calling recordValidWritsIdss() without initializing ValidTxnList " + + JavaUtils.txnIdToString(txnMgr.getCurrentTxnId())); + } + ValidTxnWriteIdList txnWriteIds = txnMgr.getValidWriteIds(getTransactionalTableList(plan), txnString); + String writeIdStr = txnWriteIds.toString(); + conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, writeIdStr); + if (plan.getFetchTask() != null) { /** * This is needed for {@link HiveConf.ConfVars.HIVEFETCHTASKCONVERSION} optimization which * initializes JobConf in FetchOperator before recordValidTxns() but this has to be done * after locks are acquired to avoid race conditions in ACID. + * This case is supported only for single source query. */ - plan.getFetchTask().setValidTxnList(txnStr); + Operator<?> source = plan.getFetchTask().getWork().getSource(); + if (source instanceof TableScanOperator) { + TableScanOperator tsOp = (TableScanOperator)source; + String fullTableName = AcidUtils.getFullTableName(tsOp.getConf().getDatabaseName(), + tsOp.getConf().getTableName()); + ValidWriteIdList writeIdList = txnWriteIds.getTableValidWriteIdList(fullTableName); + if (tsOp.getConf().isTranscationalTable() && (writeIdList == null)) { + throw new IllegalStateException("ACID table: " + fullTableName + + " is missing from the ValidWriteIdList config: " + writeIdStr); + } + if (writeIdList != null) { + plan.getFetchTask().setValidWriteIdList(writeIdList.toString()); + } + } + } + LOG.debug("Encoding valid txn write ids info " + writeIdStr + " txnid:" + txnMgr.getCurrentTxnId()); + } + + // Make the list of transactional tables list which are getting read or written by current txn + private List<String> getTransactionalTableList(QueryPlan plan) { + List<String> tableList = new ArrayList<>(); + + for (ReadEntity input : plan.getInputs()) { + addTableFromEntity(input, tableList); + } + return tableList; + } + + private void addTableFromEntity(Entity entity, List<String> tableList) { + Table tbl; + switch (entity.getType()) { + case TABLE: { + tbl = entity.getTable(); + break; + } + case PARTITION: + case DUMMYPARTITION: { + tbl = entity.getPartition().getTable(); + break; + } + default: { + return; + } + } + String fullTableName = AcidUtils.getFullTableName(tbl.getDbName(), tbl.getTableName()); + if (AcidUtils.isTransactionalTable(tbl) && !tableList.contains(fullTableName)) { + tableList.add(fullTableName); } - LOG.debug("Encoding valid txns info " + txnStr + " txnid:" + txnMgr.getCurrentTxnId()); } private String getUserFromUGI() { @@ -1256,7 +1314,7 @@ public class Driver implements IDriver { if(userFromUGI == null) { throw createProcessorResponse(10); } - // Set the transaction id in all of the acid file sinks + // Set the table write id in all of the acid file sinks if (haveAcidWrite()) { List<FileSinkDesc> acidSinks = new ArrayList<>(plan.getAcidSinks()); //sorting makes tests easier to write since file names and ROW__IDs depend on statementId @@ -1264,18 +1322,25 @@ public class Driver implements IDriver { acidSinks.sort((FileSinkDesc fsd1, FileSinkDesc fsd2) -> fsd1.getDirName().compareTo(fsd2.getDirName())); for (FileSinkDesc desc : acidSinks) { - desc.setTransactionId(queryTxnMgr.getCurrentTxnId()); + TableDesc tableInfo = desc.getTableInfo(); + long writeId = queryTxnMgr.getTableWriteId(Utilities.getDatabaseName(tableInfo.getTableName()), + Utilities.getTableName(tableInfo.getTableName())); + desc.setTableWriteId(writeId); + //it's possible to have > 1 FileSink writing to the same table/partition //e.g. Merge stmt, multi-insert stmt when mixing DP and SP writes - desc.setStatementId(queryTxnMgr.getWriteIdAndIncrement()); + desc.setStatementId(queryTxnMgr.getStmtIdAndIncrement()); } } /*It's imperative that {@code acquireLocks()} is called for all commands so that HiveTxnManager can transition its state machine correctly*/ queryTxnMgr.acquireLocks(plan, ctx, userFromUGI, lDrvState); - if(queryTxnMgr.recordSnapshot(plan)) { + if (queryTxnMgr.recordSnapshot(plan)) { recordValidTxns(queryTxnMgr); } + if (plan.hasAcidResourcesInQuery()) { + recordValidWriteIds(queryTxnMgr); + } } catch (Exception e) { errorMessage = "FAILED: Error in acquiring locks: " + e.getMessage(); SQLState = ErrorMsg.findSQLState(e.getMessage()); @@ -1317,6 +1382,7 @@ public class Driver implements IDriver { // If we've opened a transaction we need to commit or rollback rather than explicitly // releasing the locks. conf.unset(ValidTxnList.VALID_TXNS_KEY); + conf.unset(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY); if(!checkConcurrency()) { return; } @@ -1456,8 +1522,6 @@ public class Driver implements IDriver { private static final ReentrantLock globalCompileLock = new ReentrantLock(); private void compileInternal(String command, boolean deferClose) throws CommandProcessorResponse { - int ret; - Metrics metrics = MetricsFactory.getInstance(); if (metrics != null) { metrics.incrementCounter(MetricsConstant.WAITING_COMPILE_OPS, 1); @@ -1626,7 +1690,6 @@ public class Driver implements IDriver { throw cpr; } - //if needRequireLock is false, the release here will do nothing because there is no lock try { //since set autocommit starts an implicit txn, close it http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java index cf19351..df84417 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/AbstractFileMergeOperator.java @@ -260,7 +260,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> // There's always just one file that we have merged. // The union/DP/etc. should already be account for in the path. Utilities.writeMmCommitManifest(Lists.newArrayList(outPath), - tmpPath.getParent(), fs, taskId, conf.getTxnId(), conf.getStmtId(), null, false); + tmpPath.getParent(), fs, taskId, conf.getWriteId(), conf.getStmtId(), null, false); LOG.info("Merged into " + finalPath + "(" + fss.getLen() + " bytes)."); } } @@ -322,7 +322,7 @@ public abstract class AbstractFileMergeOperator<T extends FileMergeDesc> try { Path outputDir = conf.getOutputPath(); FileSystem fs = outputDir.getFileSystem(hconf); - Long mmWriteId = conf.getTxnId(); + Long mmWriteId = conf.getWriteId(); int stmtId = conf.getStmtId(); if (!isMmTable) { Path backupPath = backupOutputPath(fs, outputDir); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index 16b9107..f99178d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -63,6 +63,7 @@ import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; @@ -4264,7 +4265,7 @@ public class DDLTask extends Task<DDLWork> implements Serializable { } private void handleRemoveMm( - Path path, ValidTxnList validTxnList, List<Path> result) throws HiveException { + Path path, ValidWriteIdList validWriteIdList, List<Path> result) throws HiveException { // Note: doesn't take LB into account; that is not presently supported here (throws above). try { FileSystem fs = path.getFileSystem(conf); @@ -4274,10 +4275,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { ensureDelete(fs, childPath, "a non-directory file"); continue; } - Long writeId = JavaUtils.extractTxnId(childPath); + Long writeId = JavaUtils.extractWriteId(childPath); if (writeId == null) { ensureDelete(fs, childPath, "an unknown directory"); - } else if (!validTxnList.isTxnValid(writeId)) { + } else if (!validWriteIdList.isWriteIdValid(writeId)) { // Assume no concurrent active writes - we rely on locks here. We could check and fail. ensureDelete(fs, childPath, "an uncommitted directory"); } else { @@ -4312,9 +4313,10 @@ public class DDLTask extends Task<DDLWork> implements Serializable { try { HiveTxnManager txnManager = SessionState.get().getTxnMgr(); if (txnManager.isTxnOpen()) { - mmWriteId = txnManager.getCurrentTxnId(); + mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName()); } else { - mmWriteId = txnManager.openTxn(new Context(conf), conf.getUser()); + txnManager.openTxn(new Context(conf), conf.getUser()); + mmWriteId = txnManager.getTableWriteId(tbl.getDbName(), tbl.getTableName()); txnManager.commitTxn(); } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java index 97e1e36..969c591 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java @@ -37,8 +37,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.FileUtils; -import org.apache.hadoop.hive.common.ValidReadTxnList; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.mr.ExecMapperContext; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -151,8 +151,9 @@ public class FetchOperator implements Serializable { initialize(); } - public void setValidTxnList(String txnStr) { - job.set(ValidTxnList.VALID_TXNS_KEY, txnStr); + public void setValidWriteIdList(String writeIdStr) { + job.set(ValidWriteIdList.VALID_WRITEIDS_KEY, writeIdStr); + LOG.debug("FetchOperator set writeIdStr: " + writeIdStr); } private void initialize() throws HiveException { if (isStatReader) { @@ -274,7 +275,7 @@ public class FetchOperator implements Serializable { } FileSystem fs = currPath.getFileSystem(job); if (fs.exists(currPath)) { - if (extractValidTxnList() != null && + if (extractValidWriteIdList() != null && AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { return true; } @@ -407,17 +408,17 @@ public class FetchOperator implements Serializable { if (inputFormat instanceof HiveInputFormat) { return StringUtils.escapeString(currPath.toString()); // No need to process here. } - ValidTxnList validTxnList; + ValidWriteIdList validWriteIdList; if (AcidUtils.isInsertOnlyTable(currDesc.getTableDesc().getProperties())) { - validTxnList = extractValidTxnList(); + validWriteIdList = extractValidWriteIdList(); } else { - validTxnList = null; // non-MM case + validWriteIdList = null; // non-MM case } - if (validTxnList != null) { + if (validWriteIdList != null) { Utilities.FILE_OP_LOGGER.info("Processing " + currDesc.getTableName() + " for MM paths"); } - Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validTxnList); + Path[] dirs = HiveInputFormat.processPathsForMmRead(Lists.newArrayList(currPath), job, validWriteIdList); if (dirs == null || dirs.length == 0) { return null; // No valid inputs. This condition is logged inside the call. } @@ -428,10 +429,11 @@ public class FetchOperator implements Serializable { return str.toString(); } - private ValidTxnList extractValidTxnList() { + private ValidWriteIdList extractValidWriteIdList() { if (currDesc.getTableName() == null || !org.apache.commons.lang.StringUtils.isBlank(currDesc.getTableName())) { - String txnString = job.get(ValidTxnList.VALID_TXNS_KEY); - return txnString == null ? new ValidReadTxnList() : new ValidReadTxnList(txnString); + String txnString = job.get(ValidWriteIdList.VALID_WRITEIDS_KEY); + LOG.debug("FetchOperator get writeIdStr: " + txnString); + return txnString == null ? new ValidReaderWriteIdList() : new ValidReaderWriteIdList(txnString); } return null; // not fetching from a table directly but from a temp location } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java index ada4aba..e555aec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchTask.java @@ -56,8 +56,8 @@ public class FetchTask extends Task<FetchWork> implements Serializable { super(); } - public void setValidTxnList(String txnStr) { - fetch.setValidTxnList(txnStr); + public void setValidWriteIdList(String writeIdStr) { + fetch.setValidWriteIdList(writeIdStr); } @Override public void initialize(QueryState queryState, QueryPlan queryPlan, DriverContext ctx, http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index 98bb938..ff62863 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -173,7 +173,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements int acidLastBucket = -1; int acidFileOffset = -1; private boolean isMmTable; - private Long txnId; + private Long writeId; private int stmtId; String dpDir; @@ -185,7 +185,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } else { tmpPath = specPath; taskOutputTempPath = null; // Should not be used. - txnId = conf.getTransactionId(); + writeId = conf.getTableWriteId(); stmtId = conf.getStatementId(); } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { @@ -337,7 +337,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } outPaths[filesIdx] = getTaskOutPath(taskId); } else { - String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), txnId, txnId, stmtId); + String subdirPath = AcidUtils.baseOrDeltaSubdir(conf.getInsertOverwrite(), writeId, writeId, stmtId); if (unionPath != null) { // Create the union directory inside the MM directory. subdirPath += Path.SEPARATOR + unionPath; @@ -961,7 +961,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { - fpaths.updaters[findWriterOffset(row)].insert(conf.getTransactionId(), row); + fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row); } else { // TODO I suspect we could skip much of the stuff above this in the function in the case // of update and delete. But I don't understand all of the side effects of the above @@ -1018,9 +1018,9 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } if (conf.getWriteType() == AcidUtils.Operation.UPDATE) { - fpaths.updaters[writerOffset].update(conf.getTransactionId(), row); + fpaths.updaters[writerOffset].update(conf.getTableWriteId(), row); } else if (conf.getWriteType() == AcidUtils.Operation.DELETE) { - fpaths.updaters[writerOffset].delete(conf.getTransactionId(), row); + fpaths.updaters[writerOffset].delete(conf.getTableWriteId(), row); } else { throw new HiveException("Unknown write type " + conf.getWriteType().toString()); } @@ -1321,8 +1321,8 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements } } if (conf.isMmTable()) { - Utilities.writeMmCommitManifest( - commitPaths, specPath, fs, taskId, conf.getTransactionId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); + Utilities.writeMmCommitManifest(commitPaths, specPath, fs, taskId, + conf.getTableWriteId(), conf.getStatementId(), unionPath, conf.getInsertOverwrite()); } // Only publish stats if this operator's flag was set to gather stats if (conf.isGatherStats()) { @@ -1380,7 +1380,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements MissingBucketsContext mbc = new MissingBucketsContext( conf.getTableInfo(), numBuckets, conf.getCompressed()); Utilities.handleMmTableFinalPath(specPath, unionSuffix, hconf, success, - dpLevels, lbLevels, mbc, conf.getTransactionId(), conf.getStatementId(), reporter, + dpLevels, lbLevels, mbc, conf.getTableWriteId(), conf.getStatementId(), reporter, conf.isMmTable(), conf.isMmCtas(), conf.getInsertOverwrite()); } } http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java index e092795..b3c62ad 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitTask.java @@ -34,7 +34,7 @@ public class ImportCommitTask extends Task<ImportCommitWork> { @Override public int execute(DriverContext driverContext) { if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { - Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getTxnId()); + Utilities.FILE_OP_LOGGER.trace("Executing ImportCommit for " + work.getWriteId()); } try { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java index 649b8e6..a119250 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ImportCommitWork.java @@ -26,18 +26,18 @@ import org.apache.hadoop.hive.ql.plan.Explain.Level; public class ImportCommitWork implements Serializable { private static final long serialVersionUID = 1L; private String dbName, tblName; - private long txnId; + private long writeId; private int stmtId; - public ImportCommitWork(String dbName, String tblName, long txnId, int stmtId) { - this.txnId = txnId; + public ImportCommitWork(String dbName, String tblName, long writeId, int stmtId) { + this.writeId = writeId; this.stmtId = stmtId; this.dbName = dbName; this.tblName = tblName; } - public long getTxnId() { - return txnId; + public long getWriteId() { + return writeId; } public int getStmtId() { http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index 40eb659..b490325 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -369,7 +369,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } db.loadTable(tbd.getSourcePath(), tbd.getTable().getTableName(), tbd.getLoadFileType(), work.isSrcLocal(), isSkewedStoredAsDirs(tbd), isFullAcidOp, hasFollowingStatsTask(), - tbd.getTxnId(), tbd.getStmtId()); + tbd.getWriteId(), tbd.getStmtId()); if (work.getOutputs() != null) { DDLTask.addIfAbsentByName(new WriteEntity(table, getWriteType(tbd, work.getLoadTableWork().getWriteType())), work.getOutputs()); @@ -469,7 +469,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { tbd.getInheritTableSpecs(), isSkewedStoredAsDirs(tbd), work.isSrcLocal(), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), - hasFollowingStatsTask(), tbd.getTxnId(), tbd.getStmtId()); + hasFollowingStatsTask(), tbd.getWriteId(), tbd.getStmtId()); Partition partn = db.getPartition(table, tbd.getPartitionSpec(), false); // See the comment inside updatePartitionBucketSortColumns. @@ -512,7 +512,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { (tbd.getLbCtx() == null) ? 0 : tbd.getLbCtx().calculateListBucketingLevel(), work.getLoadTableWork().getWriteType() != AcidUtils.Operation.NOT_ACID && !tbd.isMmTable(), - work.getLoadTableWork().getTxnId(), + work.getLoadTableWork().getWriteId(), tbd.getStmtId(), hasFollowingStatsTask(), work.getLoadTableWork().getWriteType(), http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java index 4732da4..bfdb7d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SMBMapJoinOperator.java @@ -211,6 +211,7 @@ public class SMBMapJoinOperator extends AbstractMapJoinOperator<SMBJoinDesc> imp AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().isTranscationalTable(), ts.getConf().getAcidOperationalProperties()); + AcidUtils.setValidWriteIdList(jobClone, ts.getConf()); ts.passExecContext(getExecContext()); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java index 8248442..fd84231 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java @@ -100,7 +100,7 @@ import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.common.JavaUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.StringInternUtils; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; @@ -4393,7 +4393,7 @@ public final class Utilities { * if the entire directory is valid (has no uncommitted/temporary files). */ public static List<Path> getValidMmDirectoriesFromTableOrPart(Path path, Configuration conf, - ValidTxnList validTxnList, int lbLevels) throws IOException { + ValidWriteIdList validWriteIdList, int lbLevels) throws IOException { Utilities.FILE_OP_LOGGER.trace("Looking for valid MM paths under {}", path); // NULL means this directory is entirely valid. List<Path> result = null; @@ -4403,8 +4403,8 @@ public final class Utilities { for (int i = 0; i < children.length; ++i) { FileStatus file = children[i]; Path childPath = file.getPath(); - Long txnId = JavaUtils.extractTxnId(childPath); - if (!file.isDirectory() || txnId == null || !validTxnList.isTxnValid(txnId)) { + Long writeId = JavaUtils.extractWriteId(childPath); + if (!file.isDirectory() || writeId == null || !validWriteIdList.isWriteIdValid(writeId)) { Utilities.FILE_OP_LOGGER.debug("Skipping path {}", childPath); if (result == null) { result = new ArrayList<>(children.length - 1); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java index 30bf534..4bc7568 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/mr/MapredLocalTask.java @@ -487,6 +487,7 @@ public class MapredLocalTask extends Task<MapredLocalWork> implements Serializab AcidUtils.setAcidOperationalProperties(jobClone, ts.getConf().isTranscationalTable(), ts.getConf().getAcidOperationalProperties()); + AcidUtils.setValidWriteIdList(jobClone, ts.getConf()); // create a fetch operator FetchOperator fetchOp = new FetchOperator(entry.getValue(), jobClone); http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java index 65eb434..1ed35b3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidInputFormat.java @@ -20,7 +20,7 @@ package org.apache.hadoop.hive.ql.io; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; @@ -109,8 +109,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> extends InputFormat<KEY, VALUE>, InputFormatChecker { static final class DeltaMetaData implements Writable { - private long minTxnId; - private long maxTxnId; + private long minWriteId; + private long maxWriteId; private List<Integer> stmtIds; //would be useful to have enum for Type: insert/delete/load data @@ -120,27 +120,27 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> /** * @param stmtIds delta dir suffixes when a single txn writes > 1 delta in the same partition */ - DeltaMetaData(long minTxnId, long maxTxnId, List<Integer> stmtIds) { - this.minTxnId = minTxnId; - this.maxTxnId = maxTxnId; + DeltaMetaData(long minWriteId, long maxWriteId, List<Integer> stmtIds) { + this.minWriteId = minWriteId; + this.maxWriteId = maxWriteId; if (stmtIds == null) { throw new IllegalArgumentException("stmtIds == null"); } this.stmtIds = stmtIds; } - long getMinTxnId() { - return minTxnId; + long getMinWriteId() { + return minWriteId; } - long getMaxTxnId() { - return maxTxnId; + long getMaxWriteId() { + return maxWriteId; } List<Integer> getStmtIds() { return stmtIds; } @Override public void write(DataOutput out) throws IOException { - out.writeLong(minTxnId); - out.writeLong(maxTxnId); + out.writeLong(minWriteId); + out.writeLong(maxWriteId); out.writeInt(stmtIds.size()); for(Integer id : stmtIds) { out.writeInt(id); @@ -148,8 +148,8 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> } @Override public void readFields(DataInput in) throws IOException { - minTxnId = in.readLong(); - maxTxnId = in.readLong(); + minWriteId = in.readLong(); + maxWriteId = in.readLong(); stmtIds.clear(); int numStatements = in.readInt(); for(int i = 0; i < numStatements; i++) { @@ -159,7 +159,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> @Override public String toString() { //? is Type - when implemented - return "Delta(?," + minTxnId + "," + maxTxnId + "," + stmtIds + ")"; + return "Delta(?," + minWriteId + "," + maxWriteId + "," + stmtIds + ")"; } } /** @@ -227,7 +227,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> * @param collapseEvents should the ACID events be collapsed so that only * the last version of the row is kept. * @param bucket the bucket to read - * @param validTxnList the list of valid transactions to use + * @param validWriteIdList the list of valid write ids to use * @param baseDirectory the base directory to read or the root directory for * old style files * @param deltaDirectory a list of delta files to include in the merge @@ -237,7 +237,7 @@ public interface AcidInputFormat<KEY extends WritableComparable, VALUE> RawReader<VALUE> getRawReader(Configuration conf, boolean collapseEvents, int bucket, - ValidTxnList validTxnList, + ValidWriteIdList validWriteIdList, Path baseDirectory, Path[] deltaDirectory ) throws IOException; http://git-wip-us.apache.org/repos/asf/hive/blob/cbb9233a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java index 26d4dc6..05beafe 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidOutputFormat.java @@ -49,8 +49,8 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO private boolean isCompressed = false; private Properties properties; private Reporter reporter; - private long minimumTransactionId; - private long maximumTransactionId; + private long minimumWriteId; + private long maximumWriteId; private int bucketId; /** * Based on {@link org.apache.hadoop.hive.ql.metadata.Hive#mvFile(HiveConf, FileSystem, Path, FileSystem, Path, boolean, boolean)} @@ -156,22 +156,22 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO } /** - * The minimum transaction id that is included in this file. - * @param min minimum transaction id + * The minimum write id that is included in this file. + * @param min minimum write id * @return this */ - public Options minimumTransactionId(long min) { - this.minimumTransactionId = min; + public Options minimumWriteId(long min) { + this.minimumWriteId = min; return this; } /** - * The maximum transaction id that is included in this file. - * @param max maximum transaction id + * The maximum write id that is included in this file. + * @param max maximum write id * @return this */ - public Options maximumTransactionId(long max) { - this.maximumTransactionId = max; + public Options maximumWriteId(long max) { + this.maximumWriteId = max; return this; } @@ -236,7 +236,7 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO */ public Options statementId(int id) { if(id >= AcidUtils.MAX_STATEMENTS_PER_TXN) { - throw new RuntimeException("Too many statements for transactionId: " + maximumTransactionId); + throw new RuntimeException("Too many statements for writeId: " + maximumWriteId); } if(id < -1) { throw new IllegalArgumentException("Illegal statementId value: " + id); @@ -277,12 +277,12 @@ public interface AcidOutputFormat<K extends WritableComparable, V> extends HiveO return reporter; } - public long getMinimumTransactionId() { - return minimumTransactionId; + public long getMinimumWriteId() { + return minimumWriteId; } - public long getMaximumTransactionId() { - return maximumTransactionId; + public long getMaximumWriteId() { + return maximumWriteId; } public boolean isWritingBase() {