yarn-7346.07.patch
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5e37ca5b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5e37ca5b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5e37ca5b Branch: refs/heads/HDFS-7240 Commit: 5e37ca5bb49f945e27f49a413d08baab562dfa9c Parents: 6e6945c Author: Haibo Chen <haiboc...@apache.org> Authored: Wed Feb 21 20:59:41 2018 -0800 Committer: Haibo Chen <haiboc...@apache.org> Committed: Wed Feb 28 21:04:54 2018 -0800 ---------------------------------------------------------------------- .../resources/assemblies/hadoop-yarn-dist.xml | 2 +- hadoop-project/pom.xml | 68 +- .../pom.xml | 78 +- .../storage/flow/TestHBaseStorageFlowRun.java | 19 +- .../flow/TestHBaseStorageFlowRunCompaction.java | 103 +-- .../pom.xml | 6 +- .../pom.xml | 186 +++++ .../src/assembly/coprocessor.xml | 38 + .../common/HBaseTimelineServerUtils.java | 222 ++++++ .../storage/common/package-info.java | 28 + .../storage/flow/FlowRunCoprocessor.java | 278 +++++++ .../storage/flow/FlowScanner.java | 723 +++++++++++++++++++ .../storage/flow/FlowScannerOperation.java | 46 ++ .../storage/flow/package-info.java | 29 + .../timelineservice/storage/package-info.java | 28 + .../pom.xml | 194 +++++ .../src/assembly/coprocessor.xml | 38 + .../common/HBaseTimelineServerUtils.java | 224 ++++++ .../storage/common/package-info.java | 28 + .../storage/flow/FlowRunCoprocessor.java | 285 ++++++++ .../storage/flow/FlowScanner.java | 723 +++++++++++++++++++ .../storage/flow/FlowScannerOperation.java | 46 ++ .../storage/flow/package-info.java | 29 + .../timelineservice/storage/package-info.java | 28 + .../pom.xml | 160 +--- .../src/assembly/coprocessor.xml | 37 - .../common/HBaseTimelineServerUtils.java | 135 ---- .../storage/common/package-info.java | 28 - .../storage/flow/FlowRunCoprocessor.java | 277 ------- .../storage/flow/FlowScanner.java | 723 ------------------- .../storage/flow/FlowScannerOperation.java | 46 -- .../storage/flow/package-info.java | 29 - .../timelineservice/storage/package-info.java | 28 - 33 files changed, 3391 insertions(+), 1521 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml ---------------------------------------------------------------------- diff --git a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml index 2c266b6..382c967 100644 --- a/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml +++ b/hadoop-assemblies/src/main/resources/assemblies/hadoop-yarn-dist.xml @@ -236,7 +236,7 @@ </moduleSet> <moduleSet> <includes> - <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server</include> + <include>org.apache.hadoop:${hbase-server-artifactid}</include> </includes> <binaries> <outputDirectory>share/hadoop/${hadoop.component}/timelineservice</outputDirectory> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-project/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index 7cc68bb..d23a548 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -49,9 +49,6 @@ <xerces.jdiff.version>2.11.0</xerces.jdiff.version> <kafka.version>0.8.2.1</kafka.version> - <hbase.version>1.2.6</hbase.version> - <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version> - <hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version> <hadoop.assemblies.version>${project.version}</hadoop.assemblies.version> <commons-daemon.version>1.0.13</commons-daemon.version> @@ -407,12 +404,6 @@ <version>${project.version}</version> </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId> - <version>${project.version}</version> - </dependency> - <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-yarn-applications-distributedshell</artifactId> @@ -666,7 +657,6 @@ <artifactId>jsp-api</artifactId> <version>2.1</version> </dependency> - <dependency> <groupId>org.glassfish</groupId> <artifactId>javax.servlet</artifactId> @@ -1839,6 +1829,64 @@ </plugins> </build> </profile> + <!-- The profile for building against HBase 1.2.x + This is the default. + --> + <profile> + <id>hbase1</id> + <activation> + <property> + <name>!hbase.profile</name> + </property> + </activation> + <properties> + <hbase.version>1.2.6</hbase.version> + <hbase-compatible-hadoop.version>2.5.1</hbase-compatible-hadoop.version> + <hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version> + <hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-1</hbase-server-artifactid> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>${hbase-server-artifactid}</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + </dependencyManagement> + </profile> + <!-- The profile for building against HBase 2.0.0. + Activate using: mvn -Dhbase.profile=2.0 + --> + <profile> + <id>hbase2</id> + <activation> + <property> + <name>hbase.profile</name> + <value>2.0</value> + </property> + </activation> + <properties> + <hbase.version>2.0.0-beta-1</hbase.version> + <hbase-compatible-hadoop.version>3.0.0</hbase-compatible-hadoop.version> + <hbase-compatible-guava.version>11.0.2</hbase-compatible-guava.version> + <hbase-server-artifactid>hadoop-yarn-server-timelineservice-hbase-server-2</hbase-server-artifactid> + </properties> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>${hbase-server-artifactid}</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jruby.jcodings</groupId> + <artifactId>jcodings</artifactId> + <version>1.0.13</version> + </dependency> + </dependencies> + </dependencyManagement> + </profile> </profiles> <repositories> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml index d9f992d..2c8d5dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/pom.xml @@ -27,7 +27,7 @@ <modelVersion>4.0.0</modelVersion> <artifactId>hadoop-yarn-server-timelineservice-hbase-tests</artifactId> <version>3.2.0-SNAPSHOT</version> - <name>Apache Hadoop YARN Timeline Service HBase tests</name> + <name>Apache Hadoop YARN TimelineService HBase tests</name> <properties> <!-- Needed for generating FindBugs warnings using parent pom --> @@ -84,18 +84,6 @@ <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId> - <scope>test</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-common</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> <version>${hbase-compatible-hadoop.version}</version> <scope>test</scope> @@ -414,4 +402,68 @@ </plugin> </plugins> </build> + + <profiles> + <profile> + <id>hbase1</id> + <activation> + <property> + <name>!hbase.profile</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + </profile> + + <profile> + <id>hbase2</id> + <activation> + <property> + <name>hbase.profile</name> + <value>2.0</value> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase-server-2</artifactId> + <scope>test</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + <version>${hbase-compatible-hadoop.version}</version> + <scope>test</scope> + </dependency> + <!-- 'mvn dependency:analyze' fails to detect use of this direct + dependency --> + <!-- This is needed by HBaseTestingUtility --> + <dependency> + <groupId>org.mockito</groupId> + <artifactId>mockito-all</artifactId> + <scope>test</scope> + </dependency> + </dependencies> + </profile> + + </profiles> </project> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index 622b0eb..c7d0d4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -25,7 +25,6 @@ import static org.junit.Assert.fail; import java.io.IOException; import java.util.EnumSet; -import java.util.List; import java.util.Map; import java.util.Set; @@ -41,7 +40,6 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; @@ -64,6 +62,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTableRW; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTableRW; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -89,7 +88,7 @@ public class TestHBaseStorageFlowRun { } @Test - public void checkCoProcessorOff() throws IOException, InterruptedException { + public void checkCoProcessorOff() throws Exception, InterruptedException { Configuration hbaseConf = util.getConfiguration(); TableName table = BaseTableRW.getTableName(hbaseConf, FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); @@ -127,19 +126,9 @@ public class TestHBaseStorageFlowRun { } private void checkCoprocessorExists(TableName table, boolean exists) - throws IOException, InterruptedException { + throws Exception { HRegionServer server = util.getRSForFirstRegionInTable(table); - List<Region> regions = server.getOnlineRegions(table); - for (Region region : regions) { - boolean found = false; - Set<String> coprocs = region.getCoprocessorHost().getCoprocessors(); - for (String coprocName : coprocs) { - if (coprocName.contains("FlowRunCoprocessor")) { - found = true; - } - } - assertEquals(found, exists); - } + HBaseTimelineServerUtils.validateFlowRunCoprocessor(server, table, exists); } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 31be285..2ff37af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; @@ -44,9 +45,7 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -327,20 +326,15 @@ public class TestHBaseStorageFlowRunCompaction { } // check in flow run table - HRegionServer server = util.getRSForFirstRegionInTable( - BaseTableRW.getTableName(c1, FlowRunTableRW.TABLE_NAME_CONF_NAME, - FlowRunTableRW.DEFAULT_TABLE_NAME)); - List<Region> regions = server.getOnlineRegions( - BaseTableRW.getTableName(c1, - FlowRunTableRW.TABLE_NAME_CONF_NAME, - FlowRunTableRW.DEFAULT_TABLE_NAME)); - assertTrue("Didn't find any regions for primary table!", - regions.size() > 0); + TableName flowRunTable = BaseTableRW.getTableName(c1, + FlowRunTableRW.TABLE_NAME_CONF_NAME, FlowRunTableRW.DEFAULT_TABLE_NAME); + HRegionServer server = util.getRSForFirstRegionInTable(flowRunTable); + // flush and compact all the regions of the primary table - for (Region region : regions) { - region.flush(true); - region.compact(true); - } + int regionNum = HBaseTimelineServerUtils.flushCompactTableRegions( + server, flowRunTable); + assertTrue("Didn't find any regions for primary table!", + regionNum > 0); // check flow run for one flow many apps checkFlowRunTable(cluster, user, flow, runid, c1, 4); @@ -392,13 +386,10 @@ public class TestHBaseStorageFlowRunCompaction { private FlowScanner getFlowScannerForTestingCompaction() { // create a FlowScanner object with the sole purpose of invoking a process // summation; - CompactionRequest request = new CompactionRequest(); - request.setIsMajor(true, true); // okay to pass in nulls for the constructor arguments // because all we want to do is invoke the process summation FlowScanner fs = new FlowScanner(null, null, - (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION - : FlowScannerOperation.MINOR_COMPACTION)); + FlowScannerOperation.MAJOR_COMPACTION); assertNotNull(fs); return fs; } @@ -423,40 +414,45 @@ public class TestHBaseStorageFlowRunCompaction { SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); List<Tag> tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_91188"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL Cell c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell1Ts, Bytes.toBytes(cellValue1), tagByteArray); currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_12700000001_29102"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a recent timestamp and attribute SUM_FINAL Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell2Ts, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_191780000000001_8195"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c3 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell3Ts, Bytes.toBytes(cellValue3), tagByteArray); currentColumnCells.add(c3); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_191780000000001_98104"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c4 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); @@ -523,10 +519,12 @@ public class TestHBaseStorageFlowRunCompaction { // insert SUM_FINAL cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -537,10 +535,12 @@ public class TestHBaseStorageFlowRunCompaction { // add SUM cells for (int i = 0; i < count; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -614,10 +614,12 @@ public class TestHBaseStorageFlowRunCompaction { // insert SUM_FINAL cells which will expire for (int i = 0; i < countFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinal, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -628,10 +630,12 @@ public class TestHBaseStorageFlowRunCompaction { // insert SUM_FINAL cells which will NOT expire for (int i = 0; i < countFinalNotExpire; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123450000" + i + "01_19" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp and attribute SUM_FINAL c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsFinalNotExpire, Bytes.toBytes(cellValueFinal), tagByteArray); @@ -642,10 +646,12 @@ public class TestHBaseStorageFlowRunCompaction { // add SUM cells for (int i = 0; i < countNotFinal; i++) { tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_1987650000" + i + "83_911" + i); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with attribute SUM c1 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, aQualifier, cellTsNotFinal, Bytes.toBytes(cellValueNotFinal), tagByteArray); @@ -697,10 +703,12 @@ public class TestHBaseStorageFlowRunCompaction { long cellValue2 = 28L; List<Tag> tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_1234588888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp and attribute SUM_FINAL @@ -709,10 +717,11 @@ public class TestHBaseStorageFlowRunCompaction { currentColumnCells.add(c1); tags = new ArrayList<>(); - t = new Tag(AggregationOperation.SUM.getTagType(), + t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_100000000001_119101"); tags.add(t); - tagByteArray = Tag.fromList(tags); + tagByteArray = HBaseTimelineServerUtils.convertTagListToByteArray(tags); // create a cell with a VERY old timestamp but has attribute SUM Cell c2 = HBaseTimelineServerUtils.createNewCell(aRowKey, aFamily, @@ -755,10 +764,12 @@ public class TestHBaseStorageFlowRunCompaction { // note down the current timestamp long currentTimestamp = System.currentTimeMillis(); List<Tag> tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM_FINAL.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM_FINAL.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); // create a cell with a VERY old timestamp @@ -793,10 +804,12 @@ public class TestHBaseStorageFlowRunCompaction { // try for 1 cell with tag SUM List<Tag> tags = new ArrayList<>(); - Tag t = new Tag(AggregationOperation.SUM.getTagType(), + Tag t = HBaseTimelineServerUtils.createTag( + AggregationOperation.SUM.getTagType(), "application_123458888888_999888"); tags.add(t); - byte[] tagByteArray = Tag.fromList(tags); + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); SortedSet<Cell> currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml index a1db497..3602f02 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/pom.xml @@ -40,8 +40,8 @@ </dependency> <dependency> - <groupId>commons-logging</groupId> - <artifactId>commons-logging</artifactId> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> </dependency> <dependency> @@ -93,7 +93,7 @@ <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId> + <artifactId>hadoop-yarn-server-common</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml new file mode 100644 index 0000000..df7c5e3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/pom.xml @@ -0,0 +1,186 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 + http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hadoop-yarn-server-timelineservice-hbase-server</artifactId> + <groupId>org.apache.hadoop</groupId> + <version>3.2.0-SNAPSHOT</version> + </parent> + + <modelVersion>4.0.0</modelVersion> + <artifactId>hadoop-yarn-server-timelineservice-hbase-server-1</artifactId> + <name>Apache Hadoop YARN TimelineService HBase Server 1.2</name> + <version>3.2.0-SNAPSHOT</version> + + <properties> + <!-- Needed for generating FindBugs warnings using parent pom --> + <yarn.basedir>${project.parent.parent.parent.parent.basedir}</yarn.basedir> + </properties> + + <profiles> + <profile> + <id>default</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-compiler-plugin</artifactId> + <executions> + <execution> + <id>default-compile</id> + <phase>none</phase> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + + <profile> + <id>hbase1</id> + <activation> + <property> + <name>!hbase.profile</name> + </property> + </activation> + <dependencies> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-server-timelineservice-hbase-common</artifactId> + </dependency> + + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + + <dependency> + <groupId>com.google.guava</groupId> + <artifactId>guava</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-annotations</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-api</artifactId> + <scope>provided</scope> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-common</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-client</artifactId> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>org.apache.hbase</groupId> + <artifactId>hbase-server</artifactId> + <scope>provided</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-hdfs-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-util</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty-sslengine</artifactId> + </exclusion> + </exclusions> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <artifactId>maven-assembly-plugin</artifactId> + <configuration> + <descriptor>src/assembly/coprocessor.xml</descriptor> + <attach>true</attach> + </configuration> + <executions> + <execution> + <id>create-coprocessor-jar</id> + <phase>prepare-package</phase> + <goals> + <goal>single</goal> + </goals> + </execution> + </executions> + </plugin> + </plugins> + </build> + </profile> + </profiles> +</project> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml new file mode 100644 index 0000000..dd53bf2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/assembly/coprocessor.xml @@ -0,0 +1,38 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.01 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.3 + http://maven.apache.org/xsd/assembly-1.1.3.xsd"> + <id>coprocessor</id> + <formats> + <format>jar</format> + </formats> + <includeBaseDirectory>false</includeBaseDirectory> + <dependencySets> + <dependencySet> + <outputDirectory>/</outputDirectory> + <useProjectArtifact>true</useProjectArtifact> + <unpack>true</unpack> + <scope>runtime</scope> + <includes> + <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-common</include> + <include>org.apache.hadoop:hadoop-yarn-server-timelineservice-hbase-server-1</include> + </includes> + </dependencySet> + </dependencySets> +</assembly> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java new file mode 100644 index 0000000..3a9e259 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/HBaseTimelineServerUtils.java @@ -0,0 +1,222 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; + +import java.io.IOException; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * A utility class used by hbase-server module. + */ +public final class HBaseTimelineServerUtils { + private HBaseTimelineServerUtils() { + } + + /** + * Creates a {@link Tag} from the input attribute. + * + * @param attribute Attribute from which tag has to be fetched. + * @return a HBase Tag. + */ + public static Tag getTagFromAttribute(Map.Entry<String, byte[]> attribute) { + // attribute could be either an Aggregation Operation or + // an Aggregation Dimension + // Get the Tag type from either + AggregationOperation aggOp = AggregationOperation + .getAggregationOperation(attribute.getKey()); + if (aggOp != null) { + Tag t = createTag(aggOp.getTagType(), attribute.getValue()); + return t; + } + + AggregationCompactionDimension aggCompactDim = + AggregationCompactionDimension.getAggregationCompactionDimension( + attribute.getKey()); + if (aggCompactDim != null) { + Tag t = createTag(aggCompactDim.getTagType(), attribute.getValue()); + return t; + } + return null; + } + + /** + * creates a new cell based on the input cell but with the new value. + * + * @param origCell Original cell + * @param newValue new cell value + * @return cell + * @throws IOException while creating new cell. + */ + public static Cell createNewCell(Cell origCell, byte[] newValue) + throws IOException { + return CellUtil.createCell(CellUtil.cloneRow(origCell), + CellUtil.cloneFamily(origCell), CellUtil.cloneQualifier(origCell), + origCell.getTimestamp(), KeyValue.Type.Put.getCode(), newValue); + } + + /** + * creates a cell with the given inputs. + * + * @param row row of the cell to be created + * @param family column family name of the new cell + * @param qualifier qualifier for the new cell + * @param ts timestamp of the new cell + * @param newValue value of the new cell + * @param tags tags in the new cell + * @return cell + * @throws IOException while creating the cell. + */ + public static Cell createNewCell(byte[] row, byte[] family, byte[] qualifier, + long ts, byte[] newValue, byte[] tags) throws IOException { + return CellUtil.createCell(row, family, qualifier, ts, KeyValue.Type.Put, + newValue, tags); + } + + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in byte array. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, byte[] tag) { + return new Tag(tagType, tag); + } + + /** + * Create a Tag. + * @param tagType tag type + * @param tag the content of the tag in String. + * @return an instance of Tag + */ + public static Tag createTag(byte tagType, String tag) { + return createTag(tagType, Bytes.toBytes(tag)); + } + + /** + * Convert a cell to a list of tags. + * @param cell the cell to convert + * @return a list of tags + */ + public static List<Tag> convertCellAsTagList(Cell cell) { + return Tag.asList( + cell.getTagsArray(), cell.getTagsOffset(), cell.getTagsLength()); + } + + /** + * Convert a list of tags to a byte array. + * @param tags the list of tags to convert + * @return byte array representation of the list of tags + */ + public static byte[] convertTagListToByteArray(List<Tag> tags) { + return Tag.fromList(tags); + } + + /** + * returns app id from the list of tags. + * + * @param tags cell tags to be looked into + * @return App Id as the AggregationCompactionDimension + */ + public static String getAggregationCompactionDimension(List<Tag> tags) { + String appId = null; + for (Tag t : tags) { + if (AggregationCompactionDimension.APPLICATION_ID.getTagType() == t + .getType()) { + appId = Bytes.toString(t.getValue()); + return appId; + } + } + return appId; + } + + /** + * Returns the first seen aggregation operation as seen in the list of input + * tags or null otherwise. + * + * @param tags list of HBase tags. + * @return AggregationOperation + */ + public static AggregationOperation getAggregationOperationFromTagsList( + List<Tag> tags) { + for (AggregationOperation aggOp : AggregationOperation.values()) { + for (Tag tag : tags) { + if (tag.getType() == aggOp.getTagType()) { + return aggOp; + } + } + } + return null; + } + + // flush and compact all the regions of the primary table + + /** + * Flush and compact all regions of a table. + * @param server region server + * @param table the table to flush and compact + * @return the number of regions flushed and compacted + */ + public static int flushCompactTableRegions(HRegionServer server, + TableName table) throws IOException { + List<Region> regions = server.getOnlineRegions(table); + for (Region region : regions) { + region.flush(true); + region.compact(true); + } + return regions.size(); + } + + /** + * Check the existence of FlowRunCoprocessor in a table. + * @param server region server + * @param table table to check + * @param existenceExpected true if the FlowRunCoprocessor is expected + * to be loaded in the table, false otherwise + * @throws Exception + */ + public static void validateFlowRunCoprocessor(HRegionServer server, + TableName table, boolean existenceExpected) throws Exception { + List<Region> regions = server.getOnlineRegions(table); + for (Region region : regions) { + boolean found = false; + Set<String> coprocs = region.getCoprocessorHost().getCoprocessors(); + for (String coprocName : coprocs) { + if (coprocName.contains("FlowRunCoprocessor")) { + found = true; + } + } + if (found != existenceExpected) { + throw new Exception("FlowRunCoprocessor is" + + (existenceExpected ? " not " : " ") + "loaded in table " + table); + } + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java new file mode 100644 index 0000000..0df5b8a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/package-info.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Package org.apache.hadoop.yarn.server.timelineservice.storage.common contains + * a set of utility classes used across backend storage reader and writer. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +package org.apache.hadoop.yarn.server.timelineservice.storage.common; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/5e37ca5b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java new file mode 100644 index 0000000..c526f58 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-server/hadoop-yarn-server-timelineservice-hbase-server-1/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.flow; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.NavigableMap; +import java.util.TreeMap; + +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.CoprocessorEnvironment; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.Tag; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver; +import org.apache.hadoop.hbase.coprocessor.ObserverContext; +import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment; +import org.apache.hadoop.hbase.regionserver.Region; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.RegionScanner; +import org.apache.hadoop.hbase.regionserver.ScanType; +import org.apache.hadoop.hbase.regionserver.Store; +import org.apache.hadoop.hbase.regionserver.StoreFile; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; +import org.apache.hadoop.hbase.regionserver.wal.WALEdit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineServerUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Coprocessor for flow run table. + */ +public class FlowRunCoprocessor extends BaseRegionObserver { + + private static final Logger LOG = + LoggerFactory.getLogger(FlowRunCoprocessor.class); + + private Region region; + /** + * generate a timestamp that is unique per row in a region this is per region. + */ + private final TimestampGenerator timestampGenerator = + new TimestampGenerator(); + + @Override + public void start(CoprocessorEnvironment e) throws IOException { + if (e instanceof RegionCoprocessorEnvironment) { + RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; + this.region = env.getRegion(); + } + } + + /* + * (non-Javadoc) + * + * This method adds the tags onto the cells in the Put. It is presumed that + * all the cells in one Put have the same set of Tags. The existing cell + * timestamp is overwritten for non-metric cells and each such cell gets a new + * unique timestamp generated by {@link TimestampGenerator} + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#prePut(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Put, + * org.apache.hadoop.hbase.regionserver.wal.WALEdit, + * org.apache.hadoop.hbase.client.Durability) + */ + @Override + public void prePut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, + WALEdit edit, Durability durability) throws IOException { + Map<String, byte[]> attributes = put.getAttributesMap(); + // Assumption is that all the cells in a put are the same operation. + List<Tag> tags = new ArrayList<>(); + if ((attributes != null) && (attributes.size() > 0)) { + for (Map.Entry<String, byte[]> attribute : attributes.entrySet()) { + Tag t = HBaseTimelineServerUtils.getTagFromAttribute(attribute); + if (t != null) { + tags.add(t); + } + } + byte[] tagByteArray = + HBaseTimelineServerUtils.convertTagListToByteArray(tags); + NavigableMap<byte[], List<Cell>> newFamilyMap = new TreeMap<>( + Bytes.BYTES_COMPARATOR); + for (Map.Entry<byte[], List<Cell>> entry : put.getFamilyCellMap() + .entrySet()) { + List<Cell> newCells = new ArrayList<>(entry.getValue().size()); + for (Cell cell : entry.getValue()) { + // for each cell in the put add the tags + // Assumption is that all the cells in + // one put are the same operation + // also, get a unique cell timestamp for non-metric cells + // this way we don't inadvertently overwrite cell versions + long cellTimestamp = getCellTimestamp(cell.getTimestamp(), tags); + newCells.add(CellUtil.createCell(CellUtil.cloneRow(cell), + CellUtil.cloneFamily(cell), CellUtil.cloneQualifier(cell), + cellTimestamp, KeyValue.Type.Put, CellUtil.cloneValue(cell), + tagByteArray)); + } + newFamilyMap.put(entry.getKey(), newCells); + } // for each entry + // Update the family map for the Put + put.setFamilyCellMap(newFamilyMap); + } + } + + /** + * Determines if the current cell's timestamp is to be used or a new unique + * cell timestamp is to be used. The reason this is done is to inadvertently + * overwrite cells when writes come in very fast. But for metric cells, the + * cell timestamp signifies the metric timestamp. Hence we don't want to + * overwrite it. + * + * @param timestamp + * @param tags + * @return cell timestamp + */ + private long getCellTimestamp(long timestamp, List<Tag> tags) { + // if ts not set (hbase sets to HConstants.LATEST_TIMESTAMP by default) + // then use the generator + if (timestamp == HConstants.LATEST_TIMESTAMP) { + return timestampGenerator.getUniqueTimestamp(); + } else { + return timestamp; + } + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preGetOp(org.apache + * .hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Get, java.util.List) + */ + @Override + public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, + Get get, List<Cell> results) throws IOException { + Scan scan = new Scan(get); + scan.setMaxVersions(); + RegionScanner scanner = null; + try { + scanner = new FlowScanner(e.getEnvironment(), scan, + region.getScanner(scan), FlowScannerOperation.READ); + scanner.next(results); + e.bypass(); + } finally { + if (scanner != null) { + scanner.close(); + } + } + } + + /* + * (non-Javadoc) + * + * Ensures that max versions are set for the Scan so that metrics can be + * correctly aggregated and min/max can be correctly determined. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#preScannerOpen(org + * .apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner preScannerOpen( + ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, + RegionScanner scanner) throws IOException { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + return scanner; + } + + /* + * (non-Javadoc) + * + * Creates a {@link FlowScanner} Scan so that it can correctly process the + * contents of {@link FlowRunTable}. + * + * @see + * org.apache.hadoop.hbase.coprocessor.BaseRegionObserver#postScannerOpen( + * org.apache.hadoop.hbase.coprocessor.ObserverContext, + * org.apache.hadoop.hbase.client.Scan, + * org.apache.hadoop.hbase.regionserver.RegionScanner) + */ + @Override + public RegionScanner postScannerOpen( + ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, + RegionScanner scanner) throws IOException { + return new FlowScanner(e.getEnvironment(), scan, + scanner, FlowScannerOperation.READ); + } + + @Override + public InternalScanner preFlush( + ObserverContext<RegionCoprocessorEnvironment> c, Store store, + InternalScanner scanner) throws IOException { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("preFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + return new FlowScanner(c.getEnvironment(), scanner, + FlowScannerOperation.FLUSH); + } + + @Override + public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, + Store store, StoreFile resultFile) { + if (LOG.isDebugEnabled()) { + if (store != null) { + LOG.debug("postFlush store = " + store.getColumnFamilyName() + + " flushableSize=" + store.getFlushableSize() + + " flushedCellsCount=" + store.getFlushedCellsCount() + + " compactedCellsCount=" + store.getCompactedCellsCount() + + " majorCompactedCellsCount=" + + store.getMajorCompactedCellsCount() + " memstoreFlushSize=" + + store.getMemstoreFlushSize() + " memstoreSize=" + + store.getMemStoreSize() + " size=" + store.getSize() + + " storeFilesCount=" + store.getStorefilesCount()); + } + } + } + + @Override + public InternalScanner preCompact( + ObserverContext<RegionCoprocessorEnvironment> e, Store store, + InternalScanner scanner, ScanType scanType, CompactionRequest request) + throws IOException { + + FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; + if (request != null) { + requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION + : FlowScannerOperation.MINOR_COMPACTION); + LOG.info("Compactionrequest= " + request.toString() + " " + + requestOp.toString() + " RegionName=" + e.getEnvironment() + .getRegion().getRegionInfo().getRegionNameAsString()); + } + return new FlowScanner(e.getEnvironment(), scanner, requestOp); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org