YARN-4986. Add a check in the coprocessor for table to operated on (Vrushali C via sjlee)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29dd0e8c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29dd0e8c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29dd0e8c Branch: refs/heads/YARN-2928 Commit: 29dd0e8ccecf2e07997e3dbf984cc335430d4d8b Parents: 74c1b59 Author: Sangjin Lee <[email protected]> Authored: Fri Apr 29 17:13:32 2016 -0700 Committer: Sangjin Lee <[email protected]> Committed: Fri Jul 8 10:19:01 2016 -0700 ---------------------------------------------------------------------- .../storage/common/TimelineStorageUtils.java | 20 +++++++ .../storage/entity/EntityTable.java | 2 +- .../storage/flow/FlowRunCoprocessor.java | 39 +++++++++++-- .../storage/flow/FlowScanner.java | 13 +++-- .../storage/flow/TestHBaseStorageFlowRun.java | 61 ++++++++++++++++++++ .../flow/TestHBaseStorageFlowRunCompaction.java | 36 ++++++++++++ 6 files changed, 160 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java index 2d85bab..18f975a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/common/TimelineStorageUtils.java @@ -32,8 +32,10 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.client.Result; @@ -56,6 +58,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Fiel import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; +import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; import org.apache.hadoop.yarn.util.ConverterUtils; /** @@ -887,4 +890,21 @@ public final class TimelineStorageUtils { Set<TimelineEvent> eventsSet = new HashSet<>(eventsMap.values()); entity.addEvents(eventsSet); } + + public static boolean isFlowRunTable(HRegionInfo hRegionInfo, + Configuration conf) { + String regionTableName = hRegionInfo.getTable().getNameAsString(); + String flowRunTableName = conf.get(FlowRunTable.TABLE_NAME_CONF_NAME, + FlowRunTable.DEFAULT_TABLE_NAME); + if (LOG.isDebugEnabled()) { + LOG.debug("regionTableName=" + regionTableName); + } + if (flowRunTableName.equalsIgnoreCase(regionTableName)) { + if (LOG.isDebugEnabled()) { + LOG.debug(" table is the flow run table!! " + flowRunTableName); + } + return true; + } + return false; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java index 3e3e3ab..b194f07 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/entity/EntityTable.java @@ -84,7 +84,7 @@ public class EntityTable extends BaseTable<EntityTable> { + ".table.metrics.ttl"; /** default value for entity table name. */ - private static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; + public static final String DEFAULT_TABLE_NAME = "timelineservice.entity"; /** default TTL is 30 days for metrics timeseries. */ private static final int DEFAULT_METRICS_TTL = 2592000; http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java index 450640a..8ea51a1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowRunCoprocessor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimestampGen public class FlowRunCoprocessor extends BaseRegionObserver { private static final Log LOG = LogFactory.getLog(FlowRunCoprocessor.class); + private boolean isFlowRunRegion = false; private HRegion region; /** @@ -70,9 +71,15 @@ public class FlowRunCoprocessor extends BaseRegionObserver { if (e instanceof RegionCoprocessorEnvironment) { RegionCoprocessorEnvironment env = (RegionCoprocessorEnvironment) e; this.region = env.getRegion(); + isFlowRunRegion = TimelineStorageUtils.isFlowRunTable( + region.getRegionInfo(), env.getConfiguration()); } } + public boolean isFlowRunRegion() { + return isFlowRunRegion; + } + /* * (non-Javadoc) * @@ -93,6 +100,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver { WALEdit edit, Durability durability) throws IOException { Map<String, byte[]> attributes = put.getAttributesMap(); + if (!isFlowRunRegion) { + return; + } // Assumption is that all the cells in a put are the same operation. List<Tag> tags = new ArrayList<>(); if ((attributes != null) && (attributes.size() > 0)) { @@ -160,6 +170,10 @@ public class FlowRunCoprocessor extends BaseRegionObserver { @Override public void preGetOp(ObserverContext<RegionCoprocessorEnvironment> e, Get get, List<Cell> results) throws IOException { + if (!isFlowRunRegion) { + return; + } + Scan scan = new Scan(get); scan.setMaxVersions(); RegionScanner scanner = null; @@ -190,11 +204,14 @@ public class FlowRunCoprocessor extends BaseRegionObserver { @Override public RegionScanner preScannerOpen( ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, - RegionScanner s) throws IOException { - // set max versions for scan to see all - // versions to aggregate for metrics - scan.setMaxVersions(); - return s; + RegionScanner scanner) throws IOException { + + if (isFlowRunRegion) { + // set max versions for scan to see all + // versions to aggregate for metrics + scan.setMaxVersions(); + } + return scanner; } /* @@ -213,6 +230,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver { public RegionScanner postScannerOpen( ObserverContext<RegionCoprocessorEnvironment> e, Scan scan, RegionScanner scanner) throws IOException { + if (!isFlowRunRegion) { + return scanner; + } return new FlowScanner(e.getEnvironment(), scan.getBatch(), scanner, FlowScannerOperation.READ); } @@ -221,6 +241,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver { public InternalScanner preFlush( ObserverContext<RegionCoprocessorEnvironment> c, Store store, InternalScanner scanner) throws IOException { + if (!isFlowRunRegion) { + return scanner; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("preFlush store = " + store.getColumnFamilyName() @@ -241,6 +264,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver { @Override public void postFlush(ObserverContext<RegionCoprocessorEnvironment> c, Store store, StoreFile resultFile) { + if (!isFlowRunRegion) { + return; + } if (LOG.isDebugEnabled()) { if (store != null) { LOG.debug("postFlush store = " + store.getColumnFamilyName() @@ -262,6 +288,9 @@ public class FlowRunCoprocessor extends BaseRegionObserver { InternalScanner scanner, ScanType scanType, CompactionRequest request) throws IOException { + if (!isFlowRunRegion) { + return scanner; + } FlowScannerOperation requestOp = FlowScannerOperation.MINOR_COMPACTION; if (request != null) { requestOp = (request.isMajor() ? FlowScannerOperation.MAJOR_COMPACTION http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java index 0ace529..398d7b4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/FlowScanner.java @@ -210,7 +210,7 @@ class FlowScanner implements RegionScanner, Closeable { if (comp.compare(currentColumnQualifier, newColumnQualifier) != 0) { if (converter != null && isNumericConverter(converter)) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); } resetState(currentColumnCells, alreadySeenAggDim); currentColumnQualifier = newColumnQualifier; @@ -219,6 +219,7 @@ class FlowScanner implements RegionScanner, Closeable { } // No operation needs to be performed on non numeric converters. if (!isNumericConverter(converter)) { + currentColumnCells.add(cell); nextCell(cellLimit); continue; } @@ -228,7 +229,7 @@ class FlowScanner implements RegionScanner, Closeable { } if (!currentColumnCells.isEmpty()) { addedCnt += emitCells(cells, currentColumnCells, currentAggOp, - (NumericValueConverter)converter, currentTimestamp); + converter, currentTimestamp); if (LOG.isDebugEnabled()) { if (addedCnt > 0) { LOG.debug("emitted cells. " + addedCnt + " for " + this.action @@ -345,7 +346,7 @@ class FlowScanner implements RegionScanner, Closeable { * parameter. */ private int emitCells(List<Cell> cells, SortedSet<Cell> currentColumnCells, - AggregationOperation currentAggOp, NumericValueConverter converter, + AggregationOperation currentAggOp, ValueConverter converter, long currentTimestamp) throws IOException { if ((currentColumnCells == null) || (currentColumnCells.size() == 0)) { return 0; @@ -372,12 +373,14 @@ class FlowScanner implements RegionScanner, Closeable { cells.addAll(currentColumnCells); return currentColumnCells.size(); case READ: - Cell sumCell = processSummation(currentColumnCells, converter); + Cell sumCell = processSummation(currentColumnCells, + (NumericValueConverter) converter); cells.add(sumCell); return 1; case MAJOR_COMPACTION: List<Cell> finalCells = processSummationMajorCompaction( - currentColumnCells, converter, currentTimestamp); + currentColumnCells, (NumericValueConverter) converter, + currentTimestamp); cells.addAll(finalCells); return finalCells.size(); default: http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java index a724db2..801d43c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRun.java @@ -19,18 +19,21 @@ package org.apache.hadoop.yarn.server.timelineservice.storage.flow; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.util.EnumSet; +import java.util.List; import java.util.Map; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.hbase.client.Get; @@ -38,6 +41,8 @@ import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; @@ -57,6 +62,8 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineWriter import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineSchemaCreator; import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field; import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; +import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; @@ -84,6 +91,60 @@ public class TestHBaseStorageFlowRun { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + @Test + public void checkCoProcessorOff() throws IOException, InterruptedException { + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Admin admin = conn.getAdmin(); + if (admin == null) { + throw new IOException("Can't check tables since admin is null"); + } + if (admin.tableExists(table)) { + // check the regions. + // check in flow run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertTrue(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + FlowActivityTable.TABLE_NAME_CONF_NAME, + FlowActivityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in flow activity table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + + table = TableName.valueOf(hbaseConf.get( + EntityTable.TABLE_NAME_CONF_NAME, + EntityTable.DEFAULT_TABLE_NAME)); + if (admin.tableExists(table)) { + // check the regions. + // check in entity run table + util.waitUntilAllRegionsAssigned(table); + HRegionServer server = util.getRSForFirstRegionInTable(table); + List<HRegion> regions = server.getOnlineRegions(table); + for (HRegion region : regions) { + assertFalse(TimelineStorageUtils.isFlowRunTable(region.getRegionInfo(), + hbaseConf)); + } + } + } + /** * Writes 4 timeline entities belonging to one flow run through the * {@link HBaseTimelineWriterImpl} http://git-wip-us.apache.org/repos/asf/hadoop/blob/29dd0e8c/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java index 2738e6a..e7e7ba4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/flow/TestHBaseStorageFlowRunCompaction.java @@ -39,6 +39,8 @@ import org.apache.hadoop.hbase.Tag; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Get; +import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; import org.apache.hadoop.hbase.client.Scan; @@ -87,6 +89,40 @@ public class TestHBaseStorageFlowRunCompaction { TimelineSchemaCreator.createAllTables(util.getConfiguration(), false); } + /** writes non numeric data into flow run table + * reads it back + * + * @throws Exception + */ + @Test + public void testWriteNonNumericData() throws Exception { + String rowKey = "nonNumericRowKey"; + String column = "nonNumericColumnName"; + String value = "nonNumericValue"; + byte[] rowKeyBytes = Bytes.toBytes(rowKey); + byte[] columnNameBytes = Bytes.toBytes(column); + byte[] valueBytes = Bytes.toBytes(value); + Put p = new Put(rowKeyBytes); + p.addColumn(FlowRunColumnFamily.INFO.getBytes(), columnNameBytes, + valueBytes); + Configuration hbaseConf = util.getConfiguration(); + TableName table = TableName.valueOf(hbaseConf.get( + FlowRunTable.TABLE_NAME_CONF_NAME, FlowRunTable.DEFAULT_TABLE_NAME)); + Connection conn = null; + conn = ConnectionFactory.createConnection(hbaseConf); + Table flowRunTable = conn.getTable(table); + flowRunTable.put(p); + + Get g = new Get(rowKeyBytes); + Result r = flowRunTable.get(g); + assertNotNull(r); + assertTrue(r.size() >= 1); + Cell actualValue = r.getColumnLatestCell( + FlowRunColumnFamily.INFO.getBytes(), columnNameBytes); + assertNotNull(CellUtil.cloneValue(actualValue)); + assertEquals(Bytes.toString(CellUtil.cloneValue(actualValue)), value); + } + @Test public void testWriteFlowRunCompaction() throws Exception { String cluster = "kompaction_cluster1"; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
