YARN-5170. Eliminate singleton converters and static method access. (Joep Rottinghuis via Varun Saxena)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/892b193b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/892b193b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/892b193b Branch: refs/heads/trunk Commit: 892b193bd77c15932b4c084c1d525b7017def0d4 Parents: 3832795 Author: Varun Saxena <[email protected]> Authored: Fri Jun 10 21:18:05 2016 +0530 Committer: Sangjin Lee <[email protected]> Committed: Sun Jul 10 08:46:03 2016 -0700 ---------------------------------------------------------------------- .../storage/TestHBaseTimelineStorage.java | 43 ++- .../flow/TestHBaseStorageFlowActivity.java | 11 +- .../storage/flow/TestHBaseStorageFlowRun.java | 11 +- .../flow/TestHBaseStorageFlowRunCompaction.java | 54 +-- .../reader/filter/TimelineFilterUtils.java | 38 --- .../storage/HBaseTimelineWriterImpl.java | 226 ++++++++----- .../storage/application/ApplicationColumn.java | 2 +- .../application/ApplicationColumnPrefix.java | 3 +- .../storage/application/ApplicationRowKey.java | 173 +++++++--- .../application/ApplicationRowKeyConverter.java | 130 -------- .../application/ApplicationRowKeyPrefix.java | 69 ++++ .../storage/apptoflow/AppToFlowRowKey.java | 93 +++++- .../apptoflow/AppToFlowRowKeyConverter.java | 96 ------ .../storage/common/AppIdKeyConverter.java | 11 +- .../storage/common/EventColumnName.java | 15 + .../common/EventColumnNameConverter.java | 12 +- .../storage/common/LongConverter.java | 27 +- .../storage/common/LongKeyConverter.java | 14 +- .../storage/common/RowKeyPrefix.java | 42 +++ .../storage/common/StringKeyConverter.java | 7 +- .../storage/common/TimelineStorageUtils.java | 171 ---------- .../storage/entity/EntityColumn.java | 3 +- .../storage/entity/EntityColumnPrefix.java | 3 +- .../storage/entity/EntityRowKey.java | 187 ++++++++--- .../storage/entity/EntityRowKeyConverter.java | 143 -------- .../storage/entity/EntityRowKeyPrefix.java | 74 +++++ .../storage/flow/FlowActivityRowKey.java | 162 ++++++--- .../flow/FlowActivityRowKeyConverter.java | 115 ------- .../storage/flow/FlowActivityRowKeyPrefix.java | 60 ++++ .../storage/flow/FlowRunColumn.java | 6 +- .../storage/flow/FlowRunColumnPrefix.java | 2 +- .../storage/flow/FlowRunRowKey.java | 129 +++++-- .../storage/flow/FlowRunRowKeyConverter.java | 120 ------- .../storage/flow/FlowRunRowKeyPrefix.java | 54 +++ .../storage/reader/ApplicationEntityReader.java | 103 +++--- .../reader/FlowActivityEntityReader.java | 26 +- .../storage/reader/FlowRunEntityReader.java | 80 ++--- .../storage/reader/GenericEntityReader.java | 333 ++++++++++--------- .../storage/reader/TimelineEntityReader.java | 136 +++++++- .../storage/common/TestKeyConverters.java | 287 ++++------------ .../storage/common/TestRowKeys.java | 244 ++++++++++++++ 41 files changed, 1883 insertions(+), 1632 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java index 7b647eb..fd5a7f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestHBaseTimelineStorage.java @@ -70,12 +70,14 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.application.Applica import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKeyPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.After; import org.junit.AfterClass; @@ -649,8 +651,9 @@ public class TestHBaseTimelineStorage { infoMap.putAll(infoMap1); // retrieve the row - byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, appId); + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(cluster, user, flow, runid, appId); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -674,7 +677,7 @@ public class TestHBaseTimelineStorage { Map<String, Object> infoColumns = ApplicationColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map<String, Set<String>> @@ -710,15 +713,16 @@ public class TestHBaseTimelineStorage { } } + KeyConverter<String> stringKeyConverter = new StringKeyConverter(); // Configuration Map<String, Object> configColumns = - ApplicationColumnPrefix.CONFIG.readResults(result, - StringKeyConverter.getInstance()); + ApplicationColumnPrefix.CONFIG + .readResults(result, stringKeyConverter); assertEquals(conf, configColumns); NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - ApplicationColumnPrefix.METRIC.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + ApplicationColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -908,7 +912,8 @@ public class TestHBaseTimelineStorage { // scan the table and see that entity exists Scan s = new Scan(); byte[] startRow = - EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + new EntityRowKeyPrefix(cluster, user, flow, runid, appName) + .getRowKeyPrefix(); s.setStartRow(startRow); s.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -916,6 +921,7 @@ public class TestHBaseTimelineStorage { int rowCount = 0; int colCount = 0; + KeyConverter<String> stringKeyConverter = new StringKeyConverter(); for (Result result : scanner) { if (result != null && !result.isEmpty()) { rowCount++; @@ -936,7 +942,7 @@ public class TestHBaseTimelineStorage { Map<String, Object> infoColumns = EntityColumnPrefix.INFO.readResults(result, - StringKeyConverter.getInstance()); + new StringKeyConverter()); assertEquals(infoMap, infoColumns); // Remember isRelatedTo is of type Map<String, Set<String>> @@ -975,13 +981,12 @@ public class TestHBaseTimelineStorage { // Configuration Map<String, Object> configColumns = - EntityColumnPrefix.CONFIG.readResults(result, - StringKeyConverter.getInstance()); + EntityColumnPrefix.CONFIG.readResults(result, stringKeyConverter); assertEquals(conf, configColumns); NavigableMap<String, NavigableMap<Long, Number>> metricsResult = - EntityColumnPrefix.METRIC.readResultsWithTimestamps( - result, StringKeyConverter.getInstance()); + EntityColumnPrefix.METRIC.readResultsWithTimestamps(result, + stringKeyConverter); NavigableMap<Long, Number> metricMap = metricsResult.get(m1.getId()); matchMetrics(metricValues, metricMap); @@ -1116,8 +1121,9 @@ public class TestHBaseTimelineStorage { hbi.stop(); // retrieve the row - byte[] rowKey = - ApplicationRowKey.getRowKey(cluster, user, flow, runid, appName); + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(cluster, user, flow, runid, appName); + byte[] rowKey = applicationRowKey.getRowKey(); Get get = new Get(rowKey); get.setMaxVersions(Integer.MAX_VALUE); Connection conn = ConnectionFactory.createConnection(c1); @@ -1132,7 +1138,7 @@ public class TestHBaseTimelineStorage { Map<EventColumnName, Object> eventsResult = ApplicationColumnPrefix.EVENT.readResults(result, - EventColumnNameConverter.getInstance()); + new EventColumnNameConverter()); // there should be only one event assertEquals(1, eventsResult.size()); for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) { @@ -1212,7 +1218,8 @@ public class TestHBaseTimelineStorage { String appName = ApplicationId.newInstance(System.currentTimeMillis() + 9000000L, 1).toString(); byte[] startRow = - EntityRowKey.getRowKeyPrefix(cluster, user, flow, runid, appName); + new EntityRowKeyPrefix(cluster, user, flow, runid, appName) + .getRowKeyPrefix(); hbi.write(cluster, user, flow, flowVersion, runid, appName, entities); hbi.stop(); // scan the table and see that entity exists @@ -1234,7 +1241,7 @@ public class TestHBaseTimelineStorage { Map<EventColumnName, Object> eventsResult = EntityColumnPrefix.EVENT.readResults(result, - EventColumnNameConverter.getInstance()); + new EventColumnNameConverter()); // there should be only one event assertEquals(1, eventsResult.size()); for (Map.Entry<EventColumnName, Object> e : eventsResult.entrySet()) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java index 589b78d..37490ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowActivity.java @@ -158,7 +158,7 @@ public class TestHBaseStorageFlowActivity { Table table1 = conn.getTable(TableName .valueOf(FlowActivityTable.DEFAULT_TABLE_NAME)); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, minStartTs, user, flow); + new FlowActivityRowKey(cluster, minStartTs, user, flow).getRowKey(); Get g = new Get(startRow); Result r1 = table1.get(g); assertNotNull(r1); @@ -278,11 +278,12 @@ public class TestHBaseStorageFlowActivity { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow) + .getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -420,11 +421,11 @@ public class TestHBaseStorageFlowActivity { Scan s = new Scan(); s.addFamily(FlowActivityColumnFamily.INFO.getBytes()); byte[] startRow = - FlowActivityRowKey.getRowKey(cluster, appCreatedTime, user, flow); + new FlowActivityRowKey(cluster, appCreatedTime, user, flow).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; byte[] stopRow = - FlowActivityRowKey.getRowKey(clusterStop, appCreatedTime, user, flow); + new FlowActivityRowKey(clusterStop, appCreatedTime, user, flow).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index a443b50..328b25a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -59,8 +59,8 @@ import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilte import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelinePrefixFilter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; -import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; @@ -224,7 +224,7 @@ public class TestHBaseStorageFlowRun { .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); @@ -354,10 +354,11 @@ public class TestHBaseStorageFlowRun { long runid, Configuration c1) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -629,7 +630,7 @@ public class TestHBaseStorageFlowRun { .valueOf(FlowRunTable.DEFAULT_TABLE_NAME)); // scan the table and see that we get back the right min and max // timestamps - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); Get g = new Get(startRow); g.addColumn(FlowRunColumnFamily.INFO.getBytes(), FlowRunColumn.MIN_START_TIME.getColumnQualifierBytes()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 6b0ee5c..e1bef53 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -19,24 +19,24 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; -import static org.junit.Assert.assertNotEquals; import java.io.IOException; -import java.util.Map; +import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.SortedSet; import java.util.TreeSet; -import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.Tag; -import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -46,21 +46,21 @@ import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriterImpl; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGenerator; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.compactions.CompactionRequest; /** * Tests the FlowRun and FlowActivity Tables @@ -194,10 +194,11 @@ public class TestHBaseStorageFlowRunCompaction { long runid, Configuration c1, int valueCount) throws IOException { Scan s = new Scan(); s.addFamily(FlowRunColumnFamily.INFO.getBytes()); - byte[] startRow = FlowRunRowKey.getRowKey(cluster, user, flow, runid); + byte[] startRow = new FlowRunRowKey(cluster, user, flow, runid).getRowKey(); s.setStartRow(startRow); String clusterStop = cluster + "1"; - byte[] stopRow = FlowRunRowKey.getRowKey(clusterStop, user, flow, runid); + byte[] stopRow = + new FlowRunRowKey(clusterStop, user, flow, runid).getRowKey(); s.setStopRow(stopRow); Connection conn = ConnectionFactory.createConnection(c1); Table table1 = conn.getTable(TableName @@ -302,8 +303,9 @@ public class TestHBaseStorageFlowRunCompaction { cell4Ts, Bytes.toBytes(cellValue4), tagByteArray); currentColumnCells.add(c4); - List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back 4 cells @@ -387,8 +389,9 @@ public class TestHBaseStorageFlowRunCompaction { cellTsNotFinal++; } - List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back count + 1 cells @@ -489,8 +492,9 @@ public class TestHBaseStorageFlowRunCompaction { cellTsNotFinal++; } - List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back @@ -554,7 +558,7 @@ public class TestHBaseStorageFlowRunCompaction { 130L, Bytes.toBytes(cellValue2), tagByteArray); currentColumnCells.add(c2); List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should be getting back two cells @@ -602,7 +606,7 @@ public class TestHBaseStorageFlowRunCompaction { currentColumnCells.add(c1); List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we should not get the same cell back // but we get back the flow cell @@ -639,7 +643,7 @@ public class TestHBaseStorageFlowRunCompaction { currentTimestamp, Bytes.toBytes(1110L), tagByteArray); currentColumnCells.add(c1); List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + new LongConverter(), currentTimestamp); assertNotNull(cells); // we expect the same cell back assertEquals(1, cells.size()); @@ -653,15 +657,19 @@ public class TestHBaseStorageFlowRunCompaction { FlowScanner fs = getFlowScannerForTestingCompaction(); long currentTimestamp = System.currentTimeMillis(); + LongConverter longConverter = new LongConverter(); + SortedSet<Cell> currentColumnCells = null; - List<Cell> cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + List<Cell> cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); currentColumnCells = new TreeSet<Cell>(KeyValue.COMPARATOR); - cells = fs.processSummationMajorCompaction(currentColumnCells, - LongConverter.getInstance(), currentTimestamp); + cells = + fs.processSummationMajorCompaction(currentColumnCells, longConverter, + currentTimestamp); assertNotNull(cells); assertEquals(0, cells.size()); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java index 036746b..cccae26 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/filter/TimelineFilterUtils.java @@ -31,14 +31,9 @@ import org.apache.hadoop.hbase.filter.CompareFilter.CompareOp; import org.apache.hadoop.hbase.filter.Filter; import org.apache.hadoop.hbase.filter.FilterList; import org.apache.hadoop.hbase.filter.FilterList.Operator; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; import org.apache.hadoop.hbase.filter.QualifierFilter; import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; @@ -209,39 +204,6 @@ public final class TimelineFilterUtils { return singleColValFilter; } - private static <T> byte[] createColQualifierPrefix(ColumnPrefix<T> colPrefix, - String column) { - if (colPrefix == ApplicationColumnPrefix.EVENT || - colPrefix == EntityColumnPrefix.EVENT) { - return EventColumnNameConverter.getInstance().encode( - new EventColumnName(column, null, null)); - } else { - return StringKeyConverter.getInstance().encode(column); - } - } - - /** - * Create a filter list of qualifier filters based on passed set of columns. - * - * @param <T> Describes the type of column prefix. - * @param colPrefix Column Prefix. - * @param columns set of column qualifiers. - * @return filter list. - */ - public static <T> FilterList createFiltersFromColumnQualifiers( - ColumnPrefix<T> colPrefix, Set<String> columns) { - FilterList list = new FilterList(Operator.MUST_PASS_ONE); - for (String column : columns) { - // For columns which have compound column qualifiers (eg. events), we need - // to include the required separator. - byte[] compoundColQual = createColQualifierPrefix(colPrefix, column); - list.addFilter(new QualifierFilter(CompareOp.EQUAL, - new BinaryPrefixComparator( - colPrefix.getColumnPrefixBytes(compoundColQual)))); - } - return list; - } - /** * Fetch columns from filter list containing exists and multivalue equality * filters. This is done to fetch only required columns from back-end and http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index f8b5a65..3511a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; @@ -45,11 +46,10 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlow import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnNameConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; @@ -86,6 +86,17 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private TypedBufferedMutator<FlowActivityTable> flowActivityTable; private TypedBufferedMutator<FlowRunTable> flowRunTable; + /** + * Used to convert strings key components to and from storage format. + */ + private final KeyConverter<String> stringKeyConverter = + new StringKeyConverter(); + + /** + * Used to convert Long key components to and from storage format. + */ + private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); + public HBaseTimelineWriterImpl() { super(HBaseTimelineWriterImpl.class.getName()); } @@ -138,12 +149,19 @@ public class HBaseTimelineWriterImpl extends AbstractService implements // if the entity is the application, the destination is the application // table - boolean isApplication = TimelineStorageUtils.isApplicationEntity(te); - byte[] rowKey = isApplication ? - ApplicationRowKey.getRowKey(clusterId, userId, flowName, flowRunId, - appId) : - EntityRowKey.getRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getId()); + boolean isApplication = isApplicationEntity(te); + byte[] rowKey; + if (isApplication) { + ApplicationRowKey applicationRowKey = + new ApplicationRowKey(clusterId, userId, flowName, flowRunId, + appId); + rowKey = applicationRowKey.getRowKey(); + } else { + EntityRowKey entityRowKey = + new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, + te.getType(), te.getId()); + rowKey = entityRowKey.getRowKey(); + } storeInfo(rowKey, te, flowVersion, isApplication); storeEvents(rowKey, te.getEvents(), isApplication); @@ -152,102 +170,101 @@ public class HBaseTimelineWriterImpl extends AbstractService implements storeRelations(rowKey, te, isApplication); if (isApplication) { - TimelineEvent event = TimelineStorageUtils.getApplicationEvent(te, - ApplicationMetricsConstants.CREATED_EVENT_TYPE); + TimelineEvent event = + getApplicationEvent(te, + ApplicationMetricsConstants.CREATED_EVENT_TYPE); + FlowRunRowKey flowRunRowKey = + new FlowRunRowKey(clusterId, userId, flowName, flowRunId); if (event != null) { - onApplicationCreated(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te, event.getTimestamp()); + AppToFlowRowKey appToFlowRowKey = + new AppToFlowRowKey(clusterId, appId); + onApplicationCreated(flowRunRowKey, appToFlowRowKey, appId, userId, + flowVersion, te, event.getTimestamp()); } // if it's an application entity, store metrics - storeFlowMetricsAppRunning(clusterId, userId, flowName, flowRunId, - appId, te); + storeFlowMetricsAppRunning(flowRunRowKey, appId, te); // if application has finished, store it's finish time and write final // values of all metrics - event = TimelineStorageUtils.getApplicationEvent(te, + event = getApplicationEvent(te, ApplicationMetricsConstants.FINISHED_EVENT_TYPE); if (event != null) { - onApplicationFinished(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te, event.getTimestamp()); + onApplicationFinished(flowRunRowKey, flowVersion, appId, te, + event.getTimestamp()); } } } return putStatus; } - private void onApplicationCreated(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te, long appCreatedTimeStamp) throws IOException { + private void onApplicationCreated(FlowRunRowKey flowRunRowKey, + AppToFlowRowKey appToFlowRowKey, String appId, String userId, + String flowVersion, TimelineEntity te, long appCreatedTimeStamp) + throws IOException { + + String flowName = flowRunRowKey.getFlowName(); + Long flowRunId = flowRunRowKey.getFlowRunId(); + // store in App to flow table - storeInAppToFlowTable(clusterId, userId, flowName, flowRunId, appId, te); + byte[] rowKey = appToFlowRowKey.getRowKey(); + AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); + AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); + AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); + // store in flow run table - storeAppCreatedInFlowRunTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, te); - // store in flow activity table - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appCreatedTimeStamp); - } + storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); - /* - * updates the {@link FlowActivityTable} with the Application TimelineEntity - * information - */ - private void storeInFlowActivityTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - long activityTimeStamp) throws IOException { - byte[] rowKey = FlowActivityRowKey.getRowKey(clusterId, activityTimeStamp, - userId, flowName); - byte[] qualifier = LongKeyConverter.getInstance().encode(flowRunId); - FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, - null, flowVersion, + // store in flow activity table + byte[] flowActivityRowKeyBytes = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) + .getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, + flowActivityTable, qualifier, null, flowVersion, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* * updates the {@link FlowRunTable} with Application Created information */ - private void storeAppCreatedInFlowRunTable(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te) throws IOException { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, te.getCreatedTime(), AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } - private void storeInAppToFlowTable(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te) - throws IOException { - byte[] rowKey = AppToFlowRowKey.getRowKey(clusterId, appId); - AppToFlowColumn.FLOW_ID.store(rowKey, appToFlowTable, null, flowName); - AppToFlowColumn.FLOW_RUN_ID.store(rowKey, appToFlowTable, null, flowRunId); - AppToFlowColumn.USER_ID.store(rowKey, appToFlowTable, null, userId); - } /* * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an * application has finished */ - private void onApplicationFinished(String clusterId, String userId, - String flowName, String flowVersion, long flowRunId, String appId, - TimelineEntity te, long appFinishedTimeStamp) throws IOException { + private void onApplicationFinished(FlowRunRowKey flowRunRowKey, + String flowVersion, String appId, TimelineEntity te, + long appFinishedTimeStamp) throws IOException { // store in flow run table - storeAppFinishedInFlowRunTable(clusterId, userId, flowName, flowRunId, - appId, te, appFinishedTimeStamp); + storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, + appFinishedTimeStamp); // indicate in the flow activity table that the app has finished - storeInFlowActivityTable(clusterId, userId, flowName, flowVersion, - flowRunId, appId, appFinishedTimeStamp); + byte[] rowKey = + new FlowActivityRowKey(flowRunRowKey.getClusterId(), + appFinishedTimeStamp, flowRunRowKey.getUserId(), + flowRunRowKey.getFlowName()).getRowKey(); + byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); + FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, + null, flowVersion, + AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); } /* * Update the {@link FlowRunTable} with Application Finished information */ - private void storeAppFinishedInFlowRunTable(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te, - long appFinishedTimeStamp) throws IOException { - byte[] rowKey = - FlowRunRowKey.getRowKey(clusterId, userId, flowName, flowRunId); + private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te, long appFinishedTimeStamp) + throws IOException { + byte[] rowKey = flowRunRowKey.getRowKey(); Attribute attributeAppId = AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, @@ -264,13 +281,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements /* * Updates the {@link FlowRunTable} with Application Metrics */ - private void storeFlowMetricsAppRunning(String clusterId, String userId, - String flowName, long flowRunId, String appId, TimelineEntity te) - throws IOException { + private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, + String appId, TimelineEntity te) throws IOException { Set<TimelineMetric> metrics = te.getMetrics(); if (metrics != null) { - byte[] rowKey = FlowRunRowKey.getRowKey(clusterId, userId, flowName, - flowRunId); + byte[] rowKey = flowRunRowKey.getRowKey(); storeFlowMetrics(rowKey, metrics, AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), AggregationOperation.SUM.getAttribute()); @@ -280,8 +295,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, Attribute... attributes) throws IOException { for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); Map<Long, Number> timeseries = metric.getValues(); for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -320,8 +334,8 @@ public class HBaseTimelineWriterImpl extends AbstractService implements String compoundValue = Separator.VALUES.joinEncoded(connectedEntity.getValue()); columnPrefix.store(rowKey, table, - StringKeyConverter.getInstance().encode(connectedEntity.getKey()), - null, compoundValue); + stringKeyConverter.encode(connectedEntity.getKey()), null, + compoundValue); } } @@ -341,7 +355,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (info != null) { for (Map.Entry<String, Object> entry : info.entrySet()) { ApplicationColumnPrefix.INFO.store(rowKey, applicationTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -355,7 +369,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (info != null) { for (Map.Entry<String, Object> entry : info.entrySet()) { EntityColumnPrefix.INFO.store(rowKey, entityTable, - StringKeyConverter.getInstance().encode(entry.getKey()), null, + stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); } } @@ -371,8 +385,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements return; } for (Map.Entry<String, String> entry : config.entrySet()) { - byte[] configKey = - StringKeyConverter.getInstance().encode(entry.getKey()); + byte[] configKey = stringKeyConverter.encode(entry.getKey()); if (isApplication) { ApplicationColumnPrefix.CONFIG.store(rowKey, applicationTable, configKey, null, entry.getValue()); @@ -392,7 +405,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements if (metrics != null) { for (TimelineMetric metric : metrics) { byte[] metricColumnQualifier = - StringKeyConverter.getInstance().encode(metric.getId()); + stringKeyConverter.encode(metric.getId()); Map<Long, Number> timeseries = metric.getValues(); for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { Long timestamp = timeseriesEntry.getKey(); @@ -425,12 +438,11 @@ public class HBaseTimelineWriterImpl extends AbstractService implements "! Using the current timestamp"); eventTimestamp = System.currentTimeMillis(); } - EventColumnNameConverter converter = - EventColumnNameConverter.getInstance(); Map<String, Object> eventInfo = event.getInfo(); if ((eventInfo == null) || (eventInfo.size() == 0)) { - byte[] columnQualifierBytes = converter.encode( - new EventColumnName(eventId, eventTimestamp, null)); + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, null) + .getColumnQualifier(); if (isApplication) { ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, columnQualifierBytes, null, Separator.EMPTY_BYTES); @@ -441,9 +453,9 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } else { for (Map.Entry<String, Object> info : eventInfo.entrySet()) { // eventId=infoKey - byte[] columnQualifierBytes = converter.encode( - new EventColumnName(eventId, eventTimestamp, - info.getKey())); + byte[] columnQualifierBytes = + new EventColumnName(eventId, eventTimestamp, info.getKey()) + .getColumnQualifier(); if (isApplication) { ApplicationColumnPrefix.EVENT.store(rowKey, applicationTable, columnQualifierBytes, null, info.getValue()); @@ -459,12 +471,56 @@ public class HBaseTimelineWriterImpl extends AbstractService implements } } + /** + * Checks if the input TimelineEntity object is an ApplicationEntity. + * + * @param te TimelineEntity object. + * @return true if input is an ApplicationEntity, false otherwise + */ + static boolean isApplicationEntity(TimelineEntity te) { + return te.getType().equals(TimelineEntityType.YARN_APPLICATION.toString()); + } + + /** + * @param te TimelineEntity object. + * @param eventId event with this id needs to be fetched + * @return TimelineEvent if TimelineEntity contains the desired event. + */ + private static TimelineEvent getApplicationEvent(TimelineEntity te, + String eventId) { + if (isApplicationEntity(te)) { + for (TimelineEvent event : te.getEvents()) { + if (event.getId().equals(eventId)) { + return event; + } + } + } + return null; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage + * .TimelineWriter#aggregate + * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, + * org.apache + * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) + */ @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { return null; } + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush + * () + */ @Override public void flush() throws IOException { // flush all buffered mutators http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java index 80fcf8c..dde3911 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java @@ -45,7 +45,7 @@ public enum ApplicationColumn implements Column<ApplicationTable> { * When the application was created. */ CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", - LongConverter.getInstance()), + new LongConverter()), /** * The version of the flow that this app belongs to. http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java index 0febc67..42488f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java @@ -67,8 +67,7 @@ public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { /** * Metrics are stored with the metric name as the column name. */ - METRIC(ApplicationColumnFamily.METRICS, null, - LongConverter.getInstance()); + METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); private final ColumnHelper<ApplicationTable> column; private final ColumnFamily<ApplicationTable> columnFamily; http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java index e476b21..da62fdf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java @@ -18,6 +18,12 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.application; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + /** * Represents a rowkey for the application table. */ @@ -27,6 +33,8 @@ public class ApplicationRowKey { private final String flowName; private final Long flowRunId; private final String appId; + private final KeyConverter<ApplicationRowKey> appRowKeyConverter = + new ApplicationRowKeyConverter(); public ApplicationRowKey(String clusterId, String userId, String flowName, Long flowRunId, String appId) { @@ -58,60 +66,141 @@ public class ApplicationRowKey { } /** - * Constructs a row key prefix for the application table as follows: - * {@code clusterId!userName!flowName!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, null, null)); - } - - /** - * Constructs a row key prefix for the application table as follows: - * {@code clusterId!userName!flowName!flowRunId!}. - * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @param flowRunId Run Id for the flow. - * @return byte array with the row key prefix - */ - public static byte[] getRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, null)); - } - - /** * Constructs a row key for the application table as follows: * {@code clusterId!userName!flowName!flowRunId!AppId}. * - * @param clusterId Cluster Id. - * @param userId User Id. - * @param flowName Flow Name. - * @param flowRunId Run Id for the flow. - * @param appId App Id. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String userId, - String flowName, Long flowRunId, String appId) { - return ApplicationRowKeyConverter.getInstance().encode( - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId)); + public byte[] getRowKey() { + return appRowKeyConverter.encode(this); } /** * Given the raw row key as bytes, returns the row key as an object. * - * @param rowKey Byte representation of row key. + * @param rowKey Byte representation of row key. * @return An <cite>ApplicationRowKey</cite> object. */ public static ApplicationRowKey parseRowKey(byte[] rowKey) { - return ApplicationRowKeyConverter.getInstance().decode(rowKey); + return new ApplicationRowKeyConverter().decode(rowKey); } + + /** + * Encodes and decodes row key for application table. The row key is of the + * form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long, + * appId is encoded and decoded using {@link AppIdKeyConverter} and rest are + * strings. + * <p> + */ + final private static class ApplicationRowKeyConverter implements + KeyConverter<ApplicationRowKey> { + + private final KeyConverter<String> appIDKeyConverter = + new AppIdKeyConverter(); + + /** + * Intended for use in ApplicationRowKey only. + */ + private ApplicationRowKeyConverter() { + } + + /** + * Application row key is of the form + * clusterId!userName!flowName!flowRunId!appId with each segment separated + * by !. The sizes below indicate sizes of each one of these segements in + * sequence. clusterId, userName and flowName are strings. flowrunId is a + * long hence 8 bytes in size. app id is represented as 12 bytes with + * cluster timestamp part of appid takes 8 bytes(long) and seq id takes 4 + * bytes(int). Strings are variable in size (i.e. end whenever separator is + * encountered). This is used while decoding and helps in determining where + * to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, + AppIdKeyConverter.getKeySize() }; + + /* + * (non-Javadoc) + * + * Encodes ApplicationRowKey object into a byte array with each + * component/field in ApplicationRowKey separated by Separator#QUALIFIERS. + * This leads to an application table row key of the form + * clusterId!userName!flowName!flowRunId!appId If flowRunId in passed + * ApplicationRowKey object is null (and the fields preceding it i.e. + * clusterId, userId and flowName are not null), this returns a row key + * prefix of the form clusterId!userName!flowName! and if appId in + * ApplicationRowKey is null (other 4 components all are not null), this + * returns a row key prefix of the form + * clusterId!userName!flowName!flowRunId! flowRunId is inverted while + * encoding as it helps maintain a descending order for row keys in the + * application table. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(ApplicationRowKey rowKey) { + byte[] cluster = + Separator.encode(rowKey.getClusterId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] user = + Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, + Separator.QUALIFIERS); + byte[] flow = + Separator.encode(rowKey.getFlowName(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] first = Separator.QUALIFIERS.join(cluster, user, flow); + // Note that flowRunId is a long, so we can't encode them all at the same + // time. + if (rowKey.getFlowRunId() == null) { + return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); + } + byte[] second = + Bytes.toBytes(LongConverter.invertLong( + rowKey.getFlowRunId())); + if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { + return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); + } + byte[] third = appIDKeyConverter.encode(rowKey.getAppId()); + return Separator.QUALIFIERS.join(first, second, third); + } + + /* + * (non-Javadoc) + * + * Decodes an application row key of the form + * clusterId!userName!flowName!flowRunId!appId represented in byte format + * and converts it into an ApplicationRowKey object.flowRunId is inverted + * while decoding as it was inverted while encoding. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public ApplicationRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 5) { + throw new IllegalArgumentException("the row key is not valid for " + + "an application"); + } + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String userId = + Separator.decode(Bytes.toString(rowKeyComponents[1]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String flowName = + Separator.decode(Bytes.toString(rowKeyComponents[2]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + Long flowRunId = + LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); + String appId = appIDKeyConverter.decode(rowKeyComponents[4]); + return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, + appId); + } + } + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java deleted file mode 100644 index 3b054a5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyConverter.java +++ /dev/null @@ -1,130 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; - -/** - * Encodes and decodes row key for application table. - * The row key is of the form : clusterId!userName!flowName!flowRunId!appId. - * flowRunId is a long, appId is encoded and decoded using - * {@link AppIdKeyConverter} and rest are strings. - */ -public final class ApplicationRowKeyConverter implements - KeyConverter<ApplicationRowKey> { - private static final ApplicationRowKeyConverter INSTANCE = - new ApplicationRowKeyConverter(); - - public static ApplicationRowKeyConverter getInstance() { - return INSTANCE; - } - - private ApplicationRowKeyConverter() { - } - - // Application row key is of the form - // clusterId!userName!flowName!flowRunId!appId with each segment separated - // by !. The sizes below indicate sizes of each one of these segements in - // sequence. clusterId, userName and flowName are strings. flowrunId is a long - // hence 8 bytes in size. app id is represented as 12 bytes with cluster - // timestamp part of appid being 8 bytes(long) and seq id being 4 bytes(int). - // Strings are variable in size (i.e. end whenever separator is encountered). - // This is used while decoding and helps in determining where to split. - private static final int[] SEGMENT_SIZES = { - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, - Bytes.SIZEOF_LONG, AppIdKeyConverter.getKeySize() }; - - /* - * (non-Javadoc) - * - * Encodes ApplicationRowKey object into a byte array with each - * component/field in ApplicationRowKey separated by Separator#QUALIFIERS. - * This leads to an application table row key of the form - * clusterId!userName!flowName!flowRunId!appId - * If flowRunId in passed ApplicationRowKey object is null (and the fields - * preceding it i.e. clusterId, userId and flowName are not null), this - * returns a row key prefix of the form clusterId!userName!flowName! and if - * appId in ApplicationRowKey is null (other 4 components are not null), this - * returns a row key prefix of the form clusterId!userName!flowName!flowRunId! - * flowRunId is inverted while encoding as it helps maintain a descending - * order for row keys in application table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #encode(java.lang.Object) - */ - @Override - public byte[] encode(ApplicationRowKey rowKey) { - byte[] cluster = Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] user = Separator.encode(rowKey.getUserId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] flow = Separator.encode(rowKey.getFlowName(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] first = Separator.QUALIFIERS.join(cluster, user, flow); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - if (rowKey.getFlowRunId() == null) { - return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); - } - byte[] second = Bytes.toBytes( - TimelineStorageUtils.invertLong(rowKey.getFlowRunId())); - if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { - return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); - } - byte[] third = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); - return Separator.QUALIFIERS.join(first, second, third); - } - - /* - * (non-Javadoc) - * - * Decodes an application row key of the form - * clusterId!userName!flowName!flowRunId!appId represented in byte format and - * converts it into an ApplicationRowKey object.flowRunId is inverted while - * decoding as it was inverted while encoding. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #decode(byte[]) - */ - @Override - public ApplicationRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 5) { - throw new IllegalArgumentException("the row key is not valid for " + - "an application"); - } - String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String userId = Separator.decode(Bytes.toString(rowKeyComponents[1]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long flowRunId = - TimelineStorageUtils.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[4]); - return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, appId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java new file mode 100644 index 0000000..f61b0e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage.application; + +import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; + +/** + * Represents a partial rowkey (without flowName or without flowName and + * flowRunId) for the application table. + */ +public class ApplicationRowKeyPrefix extends ApplicationRowKey implements + RowKeyPrefix<ApplicationRowKey> { + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * application table: {@code clusterId!userName!flowName!}. + * + * @param clusterId the cluster on which applications ran + * @param userId the user that ran applications + * @param flowName the name of the flow that was run by the user on the + * cluster + */ + public ApplicationRowKeyPrefix(String clusterId, String userId, + String flowName) { + super(clusterId, userId, flowName, null, null); + } + + /** + * Creates a prefix which generates the following rowKeyPrefixes for the + * application table: {@code clusterId!userName!flowName!flowRunId!}. + * + * @param clusterId identifying the cluster + * @param userId identifying the user + * @param flowName identifying the flow + * @param flowRunId identifying the instance of this flow + */ + public ApplicationRowKeyPrefix(String clusterId, String userId, + String flowName, Long flowRunId) { + super(clusterId, userId, flowName, flowRunId, null); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.application. + * RowKeyPrefix#getRowKeyPrefix() + */ + @Override + public byte[] getRowKeyPrefix() { + return super.getRowKey(); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java index 6a38e32..8df4407 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKey.java @@ -17,12 +17,19 @@ */ package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; + /** * Represents a rowkey for the app_flow table. */ public class AppToFlowRowKey { private final String clusterId; private final String appId; + private final KeyConverter<AppToFlowRowKey> appToFlowRowKeyConverter = + new AppToFlowRowKeyConverter(); public AppToFlowRowKey(String clusterId, String appId) { this.clusterId = clusterId; @@ -41,13 +48,10 @@ public class AppToFlowRowKey { * Constructs a row key prefix for the app_flow table as follows: * {@code clusterId!AppId}. * - * @param clusterId Cluster Id. - * @param appId Application Id. * @return byte array with the row key */ - public static byte[] getRowKey(String clusterId, String appId) { - return AppToFlowRowKeyConverter.getInstance().encode( - new AppToFlowRowKey(clusterId, appId)); + public byte[] getRowKey() { + return appToFlowRowKeyConverter.encode(this); } /** @@ -57,6 +61,83 @@ public class AppToFlowRowKey { * @return an <cite>AppToFlowRowKey</cite> object. */ public static AppToFlowRowKey parseRowKey(byte[] rowKey) { - return AppToFlowRowKeyConverter.getInstance().decode(rowKey); + return new AppToFlowRowKeyConverter().decode(rowKey); + } + + /** + * Encodes and decodes row key for app_flow table. The row key is of the form + * clusterId!appId. clusterId is a string and appId is encoded/decoded using + * {@link AppIdKeyConverter}. + * <p> + */ + final private static class AppToFlowRowKeyConverter implements + KeyConverter<AppToFlowRowKey> { + + private final KeyConverter<String> appIDKeyConverter = + new AppIdKeyConverter(); + + /** + * Intended for use in AppToFlowRowKey only. + */ + private AppToFlowRowKeyConverter() { + } + + + /** + * App to flow row key is of the form clusterId!appId with the 2 segments + * separated by !. The sizes below indicate sizes of both of these segments + * in sequence. clusterId is a string. appId is represented as 12 bytes w. + * cluster Timestamp part of appid taking 8 bytes(long) and seq id taking 4 + * bytes(int). Strings are variable in size (i.e. end whenever separator is + * encountered). This is used while decoding and helps in determining where + * to split. + */ + private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, + Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT }; + + /* + * (non-Javadoc) + * + * Encodes AppToFlowRowKey object into a byte array with each + * component/field in AppToFlowRowKey separated by Separator#QUALIFIERS. + * This leads to an app to flow table row key of the form clusterId!appId + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#encode(java.lang.Object) + */ + @Override + public byte[] encode(AppToFlowRowKey rowKey) { + byte[] first = + Separator.encode(rowKey.getClusterId(), Separator.SPACE, + Separator.TAB, Separator.QUALIFIERS); + byte[] second = appIDKeyConverter.encode(rowKey.getAppId()); + return Separator.QUALIFIERS.join(first, second); + } + + /* + * (non-Javadoc) + * + * Decodes an app to flow row key of the form clusterId!appId represented + * in byte format and converts it into an AppToFlowRowKey object. + * + * @see + * org.apache.hadoop.yarn.server.timelineservice.storage.common + * .KeyConverter#decode(byte[]) + */ + @Override + public AppToFlowRowKey decode(byte[] rowKey) { + byte[][] rowKeyComponents = + Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); + if (rowKeyComponents.length != 2) { + throw new IllegalArgumentException("the row key is not valid for " + + "the app-to-flow table"); + } + String clusterId = + Separator.decode(Bytes.toString(rowKeyComponents[0]), + Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); + String appId = appIDKeyConverter.decode(rowKeyComponents[1]); + return new AppToFlowRowKey(clusterId, appId); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java deleted file mode 100644 index 0f0b879d..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowRowKeyConverter.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Encodes and decodes row key for app_flow table. - * The row key is of the form : clusterId!appId. - * clusterId is a string and appId is encoded/decoded using - * {@link AppIdKeyConverter}. - */ -public final class AppToFlowRowKeyConverter - implements KeyConverter<AppToFlowRowKey> { - private static final AppToFlowRowKeyConverter INSTANCE = - new AppToFlowRowKeyConverter(); - - public static AppToFlowRowKeyConverter getInstance() { - return INSTANCE; - } - - private AppToFlowRowKeyConverter() { - } - - // App to flow row key is of the form clusterId!appId with the 2 segments - // separated by !. The sizes below indicate sizes of both of these segments - // in sequence. clusterId is a string. appId is represented as 12 bytes with - // cluster Timestamp part of appid being 8 bytes(long) and seq id being 4 - // bytes(int). - // Strings are variable in size (i.e. end whenever separator is encountered). - // This is used while decoding and helps in determining where to split. - private static final int[] SEGMENT_SIZES = { - Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG + Bytes.SIZEOF_INT }; - - /* - * (non-Javadoc) - * - * Encodes AppToFlowRowKey object into a byte array with each component/field - * in AppToFlowRowKey separated by Separator#QUALIFIERS. This leads to an - * app to flow table row key of the form clusterId!appId - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #encode(java.lang.Object) - */ - @Override - public byte[] encode(AppToFlowRowKey rowKey) { - byte[] first = Separator.encode(rowKey.getClusterId(), - Separator.SPACE, Separator.TAB, Separator.QUALIFIERS); - byte[] second = AppIdKeyConverter.getInstance().encode(rowKey.getAppId()); - return Separator.QUALIFIERS.join(first, second); - } - - /* - * (non-Javadoc) - * - * Decodes an app to flow row key of the form clusterId!appId represented in - * byte format and converts it into an AppToFlowRowKey object. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter - * #decode(byte[]) - */ - @Override - public AppToFlowRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 2) { - throw new IllegalArgumentException("the row key is not valid for " + - "the app-to-flow table"); - } - String clusterId = Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String appId = AppIdKeyConverter.getInstance().decode(rowKeyComponents[1]); - return new AppToFlowRowKey(clusterId, appId); - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java index a173b0f..f5f7aa6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/AppIdKeyConverter.java @@ -28,13 +28,8 @@ import org.apache.hadoop.yarn.util.ConverterUtils; * (long - 8 bytes) followed by sequence id section of app id (int - 4 bytes). */ public final class AppIdKeyConverter implements KeyConverter<String> { - private static final AppIdKeyConverter INSTANCE = new AppIdKeyConverter(); - public static AppIdKeyConverter getInstance() { - return INSTANCE; - } - - private AppIdKeyConverter() { + public AppIdKeyConverter() { } /* @@ -58,7 +53,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> { ApplicationId appId = ConverterUtils.toApplicationId(appIdStr); byte[] appIdBytes = new byte[getKeySize()]; byte[] clusterTs = Bytes.toBytes( - TimelineStorageUtils.invertLong(appId.getClusterTimestamp())); + LongConverter.invertLong(appId.getClusterTimestamp())); System.arraycopy(clusterTs, 0, appIdBytes, 0, Bytes.SIZEOF_LONG); byte[] seqId = Bytes.toBytes(TimelineStorageUtils.invertInt(appId.getId())); System.arraycopy(seqId, 0, appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT); @@ -83,7 +78,7 @@ public final class AppIdKeyConverter implements KeyConverter<String> { if (appIdBytes.length != getKeySize()) { throw new IllegalArgumentException("Invalid app id in byte format"); } - long clusterTs = TimelineStorageUtils.invertLong( + long clusterTs = LongConverter.invertLong( Bytes.toLong(appIdBytes, 0, Bytes.SIZEOF_LONG)); int seqId = TimelineStorageUtils.invertInt( Bytes.toInt(appIdBytes, Bytes.SIZEOF_LONG, Bytes.SIZEOF_INT)); http://git-wip-us.apache.org/repos/asf/hadoop/blob/892b193b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java index 6018f86..8445575 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/EventColumnName.java @@ -26,6 +26,8 @@ public class EventColumnName { private final String id; private final Long timestamp; private final String infoKey; + private final KeyConverter<EventColumnName> eventColumnNameConverter = + new EventColumnNameConverter(); public EventColumnName(String id, Long timestamp, String infoKey) { this.id = id; @@ -45,4 +47,17 @@ public class EventColumnName { return infoKey; } + /** + * @return a byte array with each components/fields separated by + * Separator#VALUES. This leads to an event column name of the form + * eventId=timestamp=infokey. If both timestamp and infokey are null, + * then a qualifier of the form eventId=timestamp= is returned. If + * only infokey is null, then a qualifier of the form eventId= is + * returned. These prefix forms are useful for queries that intend to + * retrieve more than one specific column name. + */ + public byte[] getColumnQualifier() { + return eventColumnNameConverter.encode(this); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
