http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java deleted file mode 100644 index f938185..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ /dev/null @@ -1,593 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.Map; -import java.util.Set; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.service.AbstractService; -import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric; -import org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse; -import org.apache.hadoop.yarn.server.metrics.ApplicationMetricsConstants; -import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.EventColumnName; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.StringKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationCompactionDimension; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.AggregationOperation; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumn; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationRowKey; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This implements a hbase based backend for storing the timeline entity - * information. - * It writes to multiple tables at the backend - */ [email protected] [email protected] -public class HBaseTimelineWriterImpl extends AbstractService implements - TimelineWriter { - - private static final Logger LOG = LoggerFactory - .getLogger(HBaseTimelineWriterImpl.class); - - private Connection conn; - private TypedBufferedMutator<EntityTable> entityTable; - private TypedBufferedMutator<AppToFlowTable> appToFlowTable; - private TypedBufferedMutator<ApplicationTable> applicationTable; - private TypedBufferedMutator<FlowActivityTable> flowActivityTable; - private TypedBufferedMutator<FlowRunTable> flowRunTable; - private TypedBufferedMutator<SubApplicationTable> subApplicationTable; - - /** - * Used to convert strings key components to and from storage format. - */ - private final KeyConverter<String> stringKeyConverter = - new StringKeyConverter(); - - /** - * Used to convert Long key components to and from storage format. - */ - private final KeyConverter<Long> longKeyConverter = new LongKeyConverter(); - - private enum Tables { - APPLICATION_TABLE, ENTITY_TABLE, SUBAPPLICATION_TABLE - }; - - public HBaseTimelineWriterImpl() { - super(HBaseTimelineWriterImpl.class.getName()); - } - - /** - * initializes the hbase connection to write to the entity table. - */ - @Override - protected void serviceInit(Configuration conf) throws Exception { - super.serviceInit(conf); - Configuration hbaseConf = - HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); - conn = ConnectionFactory.createConnection(hbaseConf); - entityTable = new EntityTable().getTableMutator(hbaseConf, conn); - appToFlowTable = new AppToFlowTable().getTableMutator(hbaseConf, conn); - applicationTable = new ApplicationTable().getTableMutator(hbaseConf, conn); - flowRunTable = new FlowRunTable().getTableMutator(hbaseConf, conn); - flowActivityTable = - new FlowActivityTable().getTableMutator(hbaseConf, conn); - subApplicationTable = - new SubApplicationTable().getTableMutator(hbaseConf, conn); - - UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ? - UserGroupInformation.getLoginUser() : - UserGroupInformation.getCurrentUser(); - LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi); - } - - /** - * Stores the entire information in TimelineEntities to the timeline store. - */ - @Override - public TimelineWriteResponse write(TimelineCollectorContext context, - TimelineEntities data, UserGroupInformation callerUgi) - throws IOException { - - TimelineWriteResponse putStatus = new TimelineWriteResponse(); - - String clusterId = context.getClusterId(); - String userId = context.getUserId(); - String flowName = context.getFlowName(); - String flowVersion = context.getFlowVersion(); - long flowRunId = context.getFlowRunId(); - String appId = context.getAppId(); - String subApplicationUser = callerUgi.getShortUserName(); - - // defensive coding to avoid NPE during row key construction - if ((flowName == null) || (appId == null) || (clusterId == null) - || (userId == null)) { - LOG.warn("Found null for one of: flowName=" + flowName + " appId=" + appId - + " userId=" + userId + " clusterId=" + clusterId - + " . Not proceeding with writing to hbase"); - return putStatus; - } - - for (TimelineEntity te : data.getEntities()) { - - // a set can have at most 1 null - if (te == null) { - continue; - } - - // if the entity is the application, the destination is the application - // table - boolean isApplication = ApplicationEntity.isApplicationEntity(te); - byte[] rowKey; - if (isApplication) { - ApplicationRowKey applicationRowKey = - new ApplicationRowKey(clusterId, userId, flowName, flowRunId, - appId); - rowKey = applicationRowKey.getRowKey(); - store(rowKey, te, flowVersion, Tables.APPLICATION_TABLE); - } else { - EntityRowKey entityRowKey = - new EntityRowKey(clusterId, userId, flowName, flowRunId, appId, - te.getType(), te.getIdPrefix(), te.getId()); - rowKey = entityRowKey.getRowKey(); - store(rowKey, te, flowVersion, Tables.ENTITY_TABLE); - } - - if (!isApplication && !userId.equals(subApplicationUser)) { - SubApplicationRowKey subApplicationRowKey = - new SubApplicationRowKey(subApplicationUser, clusterId, - te.getType(), te.getIdPrefix(), te.getId(), userId); - rowKey = subApplicationRowKey.getRowKey(); - store(rowKey, te, flowVersion, Tables.SUBAPPLICATION_TABLE); - } - - if (isApplication) { - TimelineEvent event = - ApplicationEntity.getApplicationEvent(te, - ApplicationMetricsConstants.CREATED_EVENT_TYPE); - FlowRunRowKey flowRunRowKey = - new FlowRunRowKey(clusterId, userId, flowName, flowRunId); - if (event != null) { - onApplicationCreated(flowRunRowKey, clusterId, appId, userId, - flowVersion, te, event.getTimestamp()); - } - // if it's an application entity, store metrics - storeFlowMetricsAppRunning(flowRunRowKey, appId, te); - // if application has finished, store it's finish time and write final - // values of all metrics - event = ApplicationEntity.getApplicationEvent(te, - ApplicationMetricsConstants.FINISHED_EVENT_TYPE); - if (event != null) { - onApplicationFinished(flowRunRowKey, flowVersion, appId, te, - event.getTimestamp()); - } - } - } - return putStatus; - } - - private void onApplicationCreated(FlowRunRowKey flowRunRowKey, - String clusterId, String appId, String userId, String flowVersion, - TimelineEntity te, long appCreatedTimeStamp) - throws IOException { - - String flowName = flowRunRowKey.getFlowName(); - Long flowRunId = flowRunRowKey.getFlowRunId(); - - // store in App to flow table - AppToFlowRowKey appToFlowRowKey = new AppToFlowRowKey(appId); - byte[] rowKey = appToFlowRowKey.getRowKey(); - AppToFlowColumnPrefix.FLOW_NAME.store(rowKey, appToFlowTable, clusterId, - null, flowName); - AppToFlowColumnPrefix.FLOW_RUN_ID.store(rowKey, appToFlowTable, clusterId, - null, flowRunId); - AppToFlowColumnPrefix.USER_ID.store(rowKey, appToFlowTable, clusterId, null, - userId); - - // store in flow run table - storeAppCreatedInFlowRunTable(flowRunRowKey, appId, te); - - // store in flow activity table - byte[] flowActivityRowKeyBytes = - new FlowActivityRowKey(flowRunRowKey.getClusterId(), - appCreatedTimeStamp, flowRunRowKey.getUserId(), flowName) - .getRowKey(); - byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); - FlowActivityColumnPrefix.RUN_ID.store(flowActivityRowKeyBytes, - flowActivityTable, qualifier, null, flowVersion, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - /* - * updates the {@link FlowRunTable} with Application Created information - */ - private void storeAppCreatedInFlowRunTable(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te) throws IOException { - byte[] rowKey = flowRunRowKey.getRowKey(); - FlowRunColumn.MIN_START_TIME.store(rowKey, flowRunTable, null, - te.getCreatedTime(), - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - - /* - * updates the {@link FlowRunTable} and {@link FlowActivityTable} when an - * application has finished - */ - private void onApplicationFinished(FlowRunRowKey flowRunRowKey, - String flowVersion, String appId, TimelineEntity te, - long appFinishedTimeStamp) throws IOException { - // store in flow run table - storeAppFinishedInFlowRunTable(flowRunRowKey, appId, te, - appFinishedTimeStamp); - - // indicate in the flow activity table that the app has finished - byte[] rowKey = - new FlowActivityRowKey(flowRunRowKey.getClusterId(), - appFinishedTimeStamp, flowRunRowKey.getUserId(), - flowRunRowKey.getFlowName()).getRowKey(); - byte[] qualifier = longKeyConverter.encode(flowRunRowKey.getFlowRunId()); - FlowActivityColumnPrefix.RUN_ID.store(rowKey, flowActivityTable, qualifier, - null, flowVersion, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId)); - } - - /* - * Update the {@link FlowRunTable} with Application Finished information - */ - private void storeAppFinishedInFlowRunTable(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te, long appFinishedTimeStamp) - throws IOException { - byte[] rowKey = flowRunRowKey.getRowKey(); - Attribute attributeAppId = - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId); - FlowRunColumn.MAX_END_TIME.store(rowKey, flowRunTable, null, - appFinishedTimeStamp, attributeAppId); - - // store the final value of metrics since application has finished - Set<TimelineMetric> metrics = te.getMetrics(); - if (metrics != null) { - storeFlowMetrics(rowKey, metrics, attributeAppId, - AggregationOperation.SUM_FINAL.getAttribute()); - } - } - - /* - * Updates the {@link FlowRunTable} with Application Metrics - */ - private void storeFlowMetricsAppRunning(FlowRunRowKey flowRunRowKey, - String appId, TimelineEntity te) throws IOException { - Set<TimelineMetric> metrics = te.getMetrics(); - if (metrics != null) { - byte[] rowKey = flowRunRowKey.getRowKey(); - storeFlowMetrics(rowKey, metrics, - AggregationCompactionDimension.APPLICATION_ID.getAttribute(appId), - AggregationOperation.SUM.getAttribute()); - } - } - - private void storeFlowMetrics(byte[] rowKey, Set<TimelineMetric> metrics, - Attribute... attributes) throws IOException { - for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = stringKeyConverter.encode(metric.getId()); - Map<Long, Number> timeseries = metric.getValues(); - for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { - Long timestamp = timeseriesEntry.getKey(); - FlowRunColumnPrefix.METRIC.store(rowKey, flowRunTable, - metricColumnQualifier, timestamp, timeseriesEntry.getValue(), - attributes); - } - } - } - - /** - * Stores the Relations from the {@linkplain TimelineEntity} object. - */ - private <T> void storeRelations(byte[] rowKey, - Map<String, Set<String>> connectedEntities, ColumnPrefix<T> columnPrefix, - TypedBufferedMutator<T> table) throws IOException { - if (connectedEntities != null) { - for (Map.Entry<String, Set<String>> connectedEntity : connectedEntities - .entrySet()) { - // id3?id4?id5 - String compoundValue = - Separator.VALUES.joinEncoded(connectedEntity.getValue()); - columnPrefix.store(rowKey, table, - stringKeyConverter.encode(connectedEntity.getKey()), null, - compoundValue); - } - } - } - - /** - * Stores information from the {@linkplain TimelineEntity} object. - */ - private void store(byte[] rowKey, TimelineEntity te, - String flowVersion, - Tables table) throws IOException { - switch (table) { - case APPLICATION_TABLE: - ApplicationColumn.ID.store(rowKey, applicationTable, null, te.getId()); - ApplicationColumn.CREATED_TIME.store(rowKey, applicationTable, null, - te.getCreatedTime()); - ApplicationColumn.FLOW_VERSION.store(rowKey, applicationTable, null, - flowVersion); - storeInfo(rowKey, te.getInfo(), flowVersion, ApplicationColumnPrefix.INFO, - applicationTable); - storeMetrics(rowKey, te.getMetrics(), ApplicationColumnPrefix.METRIC, - applicationTable); - storeEvents(rowKey, te.getEvents(), ApplicationColumnPrefix.EVENT, - applicationTable); - storeConfig(rowKey, te.getConfigs(), ApplicationColumnPrefix.CONFIG, - applicationTable); - storeRelations(rowKey, te.getIsRelatedToEntities(), - ApplicationColumnPrefix.IS_RELATED_TO, applicationTable); - storeRelations(rowKey, te.getRelatesToEntities(), - ApplicationColumnPrefix.RELATES_TO, applicationTable); - break; - case ENTITY_TABLE: - EntityColumn.ID.store(rowKey, entityTable, null, te.getId()); - EntityColumn.TYPE.store(rowKey, entityTable, null, te.getType()); - EntityColumn.CREATED_TIME.store(rowKey, entityTable, null, - te.getCreatedTime()); - EntityColumn.FLOW_VERSION.store(rowKey, entityTable, null, flowVersion); - storeInfo(rowKey, te.getInfo(), flowVersion, EntityColumnPrefix.INFO, - entityTable); - storeMetrics(rowKey, te.getMetrics(), EntityColumnPrefix.METRIC, - entityTable); - storeEvents(rowKey, te.getEvents(), EntityColumnPrefix.EVENT, - entityTable); - storeConfig(rowKey, te.getConfigs(), EntityColumnPrefix.CONFIG, - entityTable); - storeRelations(rowKey, te.getIsRelatedToEntities(), - EntityColumnPrefix.IS_RELATED_TO, entityTable); - storeRelations(rowKey, te.getRelatesToEntities(), - EntityColumnPrefix.RELATES_TO, entityTable); - break; - case SUBAPPLICATION_TABLE: - SubApplicationColumn.ID.store(rowKey, subApplicationTable, null, - te.getId()); - SubApplicationColumn.TYPE.store(rowKey, subApplicationTable, null, - te.getType()); - SubApplicationColumn.CREATED_TIME.store(rowKey, subApplicationTable, null, - te.getCreatedTime()); - SubApplicationColumn.FLOW_VERSION.store(rowKey, subApplicationTable, null, - flowVersion); - storeInfo(rowKey, te.getInfo(), flowVersion, - SubApplicationColumnPrefix.INFO, subApplicationTable); - storeMetrics(rowKey, te.getMetrics(), SubApplicationColumnPrefix.METRIC, - subApplicationTable); - storeEvents(rowKey, te.getEvents(), SubApplicationColumnPrefix.EVENT, - subApplicationTable); - storeConfig(rowKey, te.getConfigs(), SubApplicationColumnPrefix.CONFIG, - subApplicationTable); - storeRelations(rowKey, te.getIsRelatedToEntities(), - SubApplicationColumnPrefix.IS_RELATED_TO, subApplicationTable); - storeRelations(rowKey, te.getRelatesToEntities(), - SubApplicationColumnPrefix.RELATES_TO, subApplicationTable); - break; - default: - LOG.info("Invalid table name provided."); - break; - } - } - - /** - * stores the info information from {@linkplain TimelineEntity}. - */ - private <T> void storeInfo(byte[] rowKey, Map<String, Object> info, - String flowVersion, ColumnPrefix<T> columnPrefix, - TypedBufferedMutator<T> table) throws IOException { - if (info != null) { - for (Map.Entry<String, Object> entry : info.entrySet()) { - columnPrefix.store(rowKey, table, - stringKeyConverter.encode(entry.getKey()), null, entry.getValue()); - } - } - } - - /** - * stores the config information from {@linkplain TimelineEntity}. - */ - private <T> void storeConfig(byte[] rowKey, Map<String, String> config, - ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) - throws IOException { - if (config != null) { - for (Map.Entry<String, String> entry : config.entrySet()) { - byte[] configKey = stringKeyConverter.encode(entry.getKey()); - columnPrefix.store(rowKey, table, configKey, null, entry.getValue()); - } - } - } - - /** - * stores the {@linkplain TimelineMetric} information from the - * {@linkplain TimelineEvent} object. - */ - private <T> void storeMetrics(byte[] rowKey, Set<TimelineMetric> metrics, - ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) - throws IOException { - if (metrics != null) { - for (TimelineMetric metric : metrics) { - byte[] metricColumnQualifier = - stringKeyConverter.encode(metric.getId()); - Map<Long, Number> timeseries = metric.getValues(); - for (Map.Entry<Long, Number> timeseriesEntry : timeseries.entrySet()) { - Long timestamp = timeseriesEntry.getKey(); - columnPrefix.store(rowKey, table, metricColumnQualifier, timestamp, - timeseriesEntry.getValue()); - } - } - } - } - - /** - * Stores the events from the {@linkplain TimelineEvent} object. - */ - private <T> void storeEvents(byte[] rowKey, Set<TimelineEvent> events, - ColumnPrefix<T> columnPrefix, TypedBufferedMutator<T> table) - throws IOException { - if (events != null) { - for (TimelineEvent event : events) { - if (event != null) { - String eventId = event.getId(); - if (eventId != null) { - long eventTimestamp = event.getTimestamp(); - // if the timestamp is not set, use the current timestamp - if (eventTimestamp == TimelineEvent.INVALID_TIMESTAMP) { - LOG.warn("timestamp is not set for event " + eventId + - "! Using the current timestamp"); - eventTimestamp = System.currentTimeMillis(); - } - Map<String, Object> eventInfo = event.getInfo(); - if ((eventInfo == null) || (eventInfo.size() == 0)) { - byte[] columnQualifierBytes = - new EventColumnName(eventId, eventTimestamp, null) - .getColumnQualifier(); - columnPrefix.store(rowKey, table, columnQualifierBytes, null, - Separator.EMPTY_BYTES); - } else { - for (Map.Entry<String, Object> info : eventInfo.entrySet()) { - // eventId=infoKey - byte[] columnQualifierBytes = - new EventColumnName(eventId, eventTimestamp, info.getKey()) - .getColumnQualifier(); - columnPrefix.store(rowKey, table, columnQualifierBytes, null, - info.getValue()); - } // for info: eventInfo - } - } - } - } // event : events - } - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage - * .TimelineWriter#aggregate - * (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity, - * org.apache - * .hadoop.yarn.server.timelineservice.storage.TimelineAggregationTrack) - */ - @Override - public TimelineWriteResponse aggregate(TimelineEntity data, - TimelineAggregationTrack track) throws IOException { - return null; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter#flush - * () - */ - @Override - public void flush() throws IOException { - // flush all buffered mutators - entityTable.flush(); - appToFlowTable.flush(); - applicationTable.flush(); - flowRunTable.flush(); - flowActivityTable.flush(); - subApplicationTable.flush(); - } - - /** - * close the hbase connections The close APIs perform flushing and release any - * resources held. - */ - @Override - protected void serviceStop() throws Exception { - if (entityTable != null) { - LOG.info("closing the entity table"); - // The close API performs flushing and releases any resources held - entityTable.close(); - } - if (appToFlowTable != null) { - LOG.info("closing the app_flow table"); - // The close API performs flushing and releases any resources held - appToFlowTable.close(); - } - if (applicationTable != null) { - LOG.info("closing the application table"); - applicationTable.close(); - } - if (flowRunTable != null) { - LOG.info("closing the flow run table"); - // The close API performs flushing and releases any resources held - flowRunTable.close(); - } - if (flowActivityTable != null) { - LOG.info("closing the flowActivityTable table"); - // The close API performs flushing and releases any resources held - flowActivityTable.close(); - } - if (subApplicationTable != null) { - subApplicationTable.close(); - } - if (conn != null) { - LOG.info("closing the hbase Connection"); - conn.close(); - } - super.serviceStop(); - } -}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java deleted file mode 100644 index c9f7cec..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineSchemaCreator.java +++ /dev/null @@ -1,367 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.CommandLineParser; -import org.apache.commons.cli.HelpFormatter; -import org.apache.commons.cli.Option; -import org.apache.commons.cli.Options; -import org.apache.commons.cli.ParseException; -import org.apache.commons.cli.PosixParser; -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Connection; -import org.apache.hadoop.hbase.client.ConnectionFactory; -import org.apache.hadoop.util.GenericOptionsParser; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.application.ApplicationTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow.AppToFlowTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.HBaseTimelineStorageUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.entity.EntityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowActivityTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.FlowRunTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.subapplication.SubApplicationTable; - -import com.google.common.annotations.VisibleForTesting; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * This creates the schema for a hbase based backend for storing application - * timeline information. - */ [email protected] [email protected] -public final class TimelineSchemaCreator { - private TimelineSchemaCreator() { - } - - final static String NAME = TimelineSchemaCreator.class.getSimpleName(); - private static final Logger LOG = - LoggerFactory.getLogger(TimelineSchemaCreator.class); - private static final String SKIP_EXISTING_TABLE_OPTION_SHORT = "s"; - private static final String APP_METRICS_TTL_OPTION_SHORT = "ma"; - private static final String SUB_APP_METRICS_TTL_OPTION_SHORT = "msa"; - private static final String APP_TABLE_NAME_SHORT = "a"; - private static final String SUB_APP_TABLE_NAME_SHORT = "sa"; - private static final String APP_TO_FLOW_TABLE_NAME_SHORT = "a2f"; - private static final String ENTITY_METRICS_TTL_OPTION_SHORT = "me"; - private static final String ENTITY_TABLE_NAME_SHORT = "e"; - private static final String HELP_SHORT = "h"; - private static final String CREATE_TABLES_SHORT = "c"; - - public static void main(String[] args) throws Exception { - - LOG.info("Starting the schema creation"); - Configuration hbaseConf = - HBaseTimelineStorageUtils.getTimelineServiceHBaseConf( - new YarnConfiguration()); - // Grab input args and allow for -Dxyz style arguments - String[] otherArgs = new GenericOptionsParser(hbaseConf, args) - .getRemainingArgs(); - - // Grab the arguments we're looking for. - CommandLine commandLine = parseArgs(otherArgs); - - if (commandLine.hasOption(HELP_SHORT)) { - // -help option has the highest precedence - printUsage(); - } else if (commandLine.hasOption(CREATE_TABLES_SHORT)) { - // Grab the entityTableName argument - String entityTableName = commandLine.getOptionValue( - ENTITY_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(entityTableName)) { - hbaseConf.set(EntityTable.TABLE_NAME_CONF_NAME, entityTableName); - } - // Grab the entity metrics TTL - String entityTableMetricsTTL = commandLine.getOptionValue( - ENTITY_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(entityTableMetricsTTL)) { - int entityMetricsTTL = Integer.parseInt(entityTableMetricsTTL); - new EntityTable().setMetricsTTL(entityMetricsTTL, hbaseConf); - } - // Grab the appToflowTableName argument - String appToflowTableName = commandLine.getOptionValue( - APP_TO_FLOW_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(appToflowTableName)) { - hbaseConf.set(AppToFlowTable.TABLE_NAME_CONF_NAME, appToflowTableName); - } - // Grab the applicationTableName argument - String applicationTableName = commandLine.getOptionValue( - APP_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(applicationTableName)) { - hbaseConf.set(ApplicationTable.TABLE_NAME_CONF_NAME, - applicationTableName); - } - // Grab the application metrics TTL - String applicationTableMetricsTTL = commandLine.getOptionValue( - APP_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(applicationTableMetricsTTL)) { - int appMetricsTTL = Integer.parseInt(applicationTableMetricsTTL); - new ApplicationTable().setMetricsTTL(appMetricsTTL, hbaseConf); - } - - // Grab the subApplicationTableName argument - String subApplicationTableName = commandLine.getOptionValue( - SUB_APP_TABLE_NAME_SHORT); - if (StringUtils.isNotBlank(subApplicationTableName)) { - hbaseConf.set(SubApplicationTable.TABLE_NAME_CONF_NAME, - subApplicationTableName); - } - // Grab the subApplication metrics TTL - String subApplicationTableMetricsTTL = commandLine - .getOptionValue(SUB_APP_METRICS_TTL_OPTION_SHORT); - if (StringUtils.isNotBlank(subApplicationTableMetricsTTL)) { - int subAppMetricsTTL = Integer.parseInt(subApplicationTableMetricsTTL); - new SubApplicationTable().setMetricsTTL(subAppMetricsTTL, hbaseConf); - } - - // create all table schemas in hbase - final boolean skipExisting = commandLine.hasOption( - SKIP_EXISTING_TABLE_OPTION_SHORT); - createAllSchemas(hbaseConf, skipExisting); - } else { - // print usage information if -create is not specified - printUsage(); - } - } - - /** - * Parse command-line arguments. - * - * @param args - * command line arguments passed to program. - * @return parsed command line. - * @throws ParseException - */ - private static CommandLine parseArgs(String[] args) throws ParseException { - Options options = new Options(); - - // Input - Option o = new Option(HELP_SHORT, "help", false, "print help information"); - o.setRequired(false); - options.addOption(o); - - o = new Option(CREATE_TABLES_SHORT, "create", false, - "a mandatory option to create hbase tables"); - o.setRequired(false); - options.addOption(o); - - o = new Option(ENTITY_TABLE_NAME_SHORT, "entityTableName", true, - "entity table name"); - o.setArgName("entityTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(ENTITY_METRICS_TTL_OPTION_SHORT, "entityMetricsTTL", true, - "TTL for metrics column family"); - o.setArgName("entityMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TO_FLOW_TABLE_NAME_SHORT, "appToflowTableName", true, - "app to flow table name"); - o.setArgName("appToflowTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_TABLE_NAME_SHORT, "applicationTableName", true, - "application table name"); - o.setArgName("applicationTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(APP_METRICS_TTL_OPTION_SHORT, "applicationMetricsTTL", true, - "TTL for metrics column family"); - o.setArgName("applicationMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - o = new Option(SUB_APP_TABLE_NAME_SHORT, "subApplicationTableName", true, - "subApplication table name"); - o.setArgName("subApplicationTableName"); - o.setRequired(false); - options.addOption(o); - - o = new Option(SUB_APP_METRICS_TTL_OPTION_SHORT, "subApplicationMetricsTTL", - true, "TTL for metrics column family"); - o.setArgName("subApplicationMetricsTTL"); - o.setRequired(false); - options.addOption(o); - - // Options without an argument - // No need to set arg name since we do not need an argument here - o = new Option(SKIP_EXISTING_TABLE_OPTION_SHORT, "skipExistingTable", - false, "skip existing Hbase tables and continue to create new tables"); - o.setRequired(false); - options.addOption(o); - - CommandLineParser parser = new PosixParser(); - CommandLine commandLine = null; - try { - commandLine = parser.parse(options, args); - } catch (Exception e) { - LOG.error("ERROR: " + e.getMessage() + "\n"); - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp(NAME + " ", options, true); - System.exit(-1); - } - - return commandLine; - } - - private static void printUsage() { - StringBuilder usage = new StringBuilder("Command Usage: \n"); - usage.append("TimelineSchemaCreator [-help] Display help info" + - " for all commands. Or\n"); - usage.append("TimelineSchemaCreator -create [OPTIONAL_OPTIONS]" + - " Create hbase tables.\n\n"); - usage.append("The Optional options for creating tables include: \n"); - usage.append("[-entityTableName <Entity Table Name>] " + - "The name of the Entity table\n"); - usage.append("[-entityMetricsTTL <Entity Table Metrics TTL>]" + - " TTL for metrics in the Entity table\n"); - usage.append("[-appToflowTableName <AppToflow Table Name>]" + - " The name of the AppToFlow table\n"); - usage.append("[-applicationTableName <Application Table Name>]" + - " The name of the Application table\n"); - usage.append("[-applicationMetricsTTL <Application Table Metrics TTL>]" + - " TTL for metrics in the Application table\n"); - usage.append("[-subApplicationTableName <SubApplication Table Name>]" + - " The name of the SubApplication table\n"); - usage.append("[-subApplicationMetricsTTL " + - " <SubApplication Table Metrics TTL>]" + - " TTL for metrics in the SubApplication table\n"); - usage.append("[-skipExistingTable] Whether to skip existing" + - " hbase tables\n"); - System.out.println(usage.toString()); - } - - /** - * Create all table schemas and log success or exception if failed. - * @param hbaseConf the hbase configuration to create tables with - * @param skipExisting whether to skip existing hbase tables - */ - private static void createAllSchemas(Configuration hbaseConf, - boolean skipExisting) { - List<Exception> exceptions = new ArrayList<>(); - try { - if (skipExisting) { - LOG.info("Will skip existing tables and continue on htable creation " - + "exceptions!"); - } - createAllTables(hbaseConf, skipExisting); - LOG.info("Successfully created HBase schema. "); - } catch (IOException e) { - LOG.error("Error in creating hbase tables: ", e); - exceptions.add(e); - } - - if (exceptions.size() > 0) { - LOG.warn("Schema creation finished with the following exceptions"); - for (Exception e : exceptions) { - LOG.warn(e.getMessage()); - } - System.exit(-1); - } else { - LOG.info("Schema creation finished successfully"); - } - } - - @VisibleForTesting - public static void createAllTables(Configuration hbaseConf, - boolean skipExisting) throws IOException { - - Connection conn = null; - try { - conn = ConnectionFactory.createConnection(hbaseConf); - Admin admin = conn.getAdmin(); - if (admin == null) { - throw new IOException("Cannot create table since admin is null"); - } - try { - new EntityTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new AppToFlowTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new ApplicationTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowRunTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new FlowActivityTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - try { - new SubApplicationTable().createTable(admin, hbaseConf); - } catch (IOException e) { - if (skipExisting) { - LOG.warn("Skip and continue on: " + e.getMessage()); - } else { - throw e; - } - } - } finally { - if (conn != null) { - conn.close(); - } - } - } - - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java deleted file mode 100644 index 00eaa7e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumn.java +++ /dev/null @@ -1,108 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.io.IOException; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -/** - * Identifies fully qualified columns for the {@link ApplicationTable}. - */ -public enum ApplicationColumn implements Column<ApplicationTable> { - - /** - * App id. - */ - ID(ApplicationColumnFamily.INFO, "id"), - - /** - * When the application was created. - */ - CREATED_TIME(ApplicationColumnFamily.INFO, "created_time", - new LongConverter()), - - /** - * The version of the flow that this app belongs to. - */ - FLOW_VERSION(ApplicationColumnFamily.INFO, "flow_version"); - - private final ColumnHelper<ApplicationTable> column; - private final ColumnFamily<ApplicationTable> columnFamily; - private final String columnQualifier; - private final byte[] columnQualifierBytes; - - private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, - String columnQualifier) { - this(columnFamily, columnQualifier, GenericConverter.getInstance()); - } - - private ApplicationColumn(ColumnFamily<ApplicationTable> columnFamily, - String columnQualifier, ValueConverter converter) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - // Future-proof by ensuring the right column prefix hygiene. - this.columnQualifierBytes = - Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); - this.column = new ColumnHelper<ApplicationTable>(columnFamily, converter); - } - - /** - * @return the column name value - */ - private String getColumnQualifier() { - return columnQualifier; - } - - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, Long timestamp, - Object inputValue, Attribute... attributes) throws IOException { - column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue, attributes); - } - - public Object readResult(Result result) throws IOException { - return column.readResult(result, columnQualifierBytes); - } - - @Override - public byte[] getColumnQualifierBytes() { - return columnQualifierBytes.clone(); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java deleted file mode 100644 index 97e5f7b..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnFamily.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Represents the application table column families. - */ -public enum ApplicationColumnFamily implements ColumnFamily<ApplicationTable> { - - /** - * Info column family houses known columns, specifically ones included in - * columnfamily filters. - */ - INFO("i"), - - /** - * Configurations are in a separate column family for two reasons: a) the size - * of the config values can be very large and b) we expect that config values - * are often separately accessed from other metrics and info columns. - */ - CONFIGS("c"), - - /** - * Metrics have a separate column family, because they have a separate TTL. - */ - METRICS("m"); - - /** - * Byte representation of this column family. - */ - private final byte[] bytes; - - /** - * @param value create a column family with this name. Must be lower case and - * without spaces. - */ - private ApplicationColumnFamily(String value) { - // column families should be lower case and not contain any spaces. - this.bytes = Bytes.toBytes(Separator.SPACE.encode(value)); - } - - public byte[] getBytes() { - return Bytes.copy(bytes); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java deleted file mode 100644 index 8297dc5..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationColumnPrefix.java +++ /dev/null @@ -1,236 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.io.IOException; -import java.util.Map; -import java.util.NavigableMap; - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.GenericConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -/** - * Identifies partially qualified columns for the application table. - */ -public enum ApplicationColumnPrefix implements ColumnPrefix<ApplicationTable> { - - /** - * To store TimelineEntity getIsRelatedToEntities values. - */ - IS_RELATED_TO(ApplicationColumnFamily.INFO, "s"), - - /** - * To store TimelineEntity getRelatesToEntities values. - */ - RELATES_TO(ApplicationColumnFamily.INFO, "r"), - - /** - * To store TimelineEntity info values. - */ - INFO(ApplicationColumnFamily.INFO, "i"), - - /** - * Lifecycle events for an application. - */ - EVENT(ApplicationColumnFamily.INFO, "e"), - - /** - * Config column stores configuration with config key as the column name. - */ - CONFIG(ApplicationColumnFamily.CONFIGS, null), - - /** - * Metrics are stored with the metric name as the column name. - */ - METRIC(ApplicationColumnFamily.METRICS, null, new LongConverter()); - - private final ColumnHelper<ApplicationTable> column; - private final ColumnFamily<ApplicationTable> columnFamily; - - /** - * Can be null for those cases where the provided column qualifier is the - * entire column name. - */ - private final String columnPrefix; - private final byte[] columnPrefixBytes; - - /** - * Private constructor, meant to be used by the enum definition. - * - * @param columnFamily that this column is stored in. - * @param columnPrefix for this column. - */ - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix) { - this(columnFamily, columnPrefix, GenericConverter.getInstance()); - } - - /** - * Private constructor, meant to be used by the enum definition. - * - * @param columnFamily that this column is stored in. - * @param columnPrefix for this column. - * @param converter used to encode/decode values to be stored in HBase for - * this column prefix. - */ - private ApplicationColumnPrefix(ColumnFamily<ApplicationTable> columnFamily, - String columnPrefix, ValueConverter converter) { - column = new ColumnHelper<ApplicationTable>(columnFamily, converter); - this.columnFamily = columnFamily; - this.columnPrefix = columnPrefix; - if (columnPrefix == null) { - this.columnPrefixBytes = null; - } else { - // Future-proof by ensuring the right column prefix hygiene. - this.columnPrefixBytes = - Bytes.toBytes(Separator.SPACE.encode(columnPrefix)); - } - } - - /** - * @return the column name value - */ - private String getColumnPrefix() { - return columnPrefix; - } - - @Override - public byte[] getColumnPrefixBytes(byte[] qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); - } - - @Override - public byte[] getColumnPrefixBytes(String qualifierPrefix) { - return ColumnHelper.getColumnQualifier( - this.columnPrefixBytes, qualifierPrefix); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, byte[] qualifier, - Long timestamp, Object inputValue, Attribute... attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - attributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #store(byte[], - * org.apache.hadoop.yarn.server.timelineservice.storage.common. - * TypedBufferedMutator, java.lang.String, java.lang.Long, java.lang.Object) - */ - public void store(byte[] rowKey, - TypedBufferedMutator<ApplicationTable> tableMutator, String qualifier, - Long timestamp, Object inputValue, Attribute...attributes) - throws IOException { - - // Null check - if (qualifier == null) { - throw new IOException("Cannot store column with null qualifier in " - + tableMutator.getName().getNameAsString()); - } - - byte[] columnQualifier = getColumnPrefixBytes(qualifier); - - column.store(rowKey, tableMutator, columnQualifier, timestamp, inputValue, - attributes); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResult(org.apache.hadoop.hbase.client.Result, java.lang.String) - */ - public Object readResult(Result result, String qualifier) throws IOException { - byte[] columnQualifier = - ColumnHelper.getColumnQualifier(this.columnPrefixBytes, qualifier); - return column.readResult(result, columnQualifier); - } - - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResults(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public <K> Map<K, Object> readResults(Result result, - KeyConverter<K> keyConverter) throws IOException { - return column.readResults(result, columnPrefixBytes, keyConverter); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix - * #readResultsWithTimestamps(org.apache.hadoop.hbase.client.Result, - * org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter) - */ - public <K, V> NavigableMap<K, NavigableMap<Long, V>> - readResultsWithTimestamps(Result result, KeyConverter<K> keyConverter) - throws IOException { - return column.readResultsWithTimestamps(result, columnPrefixBytes, - keyConverter); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java deleted file mode 100644 index e89a6a7..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKey.java +++ /dev/null @@ -1,251 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.util.List; - -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderUtils; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.AppIdKeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.KeyConverterToString; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.LongConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; - -/** - * Represents a rowkey for the application table. - */ -public class ApplicationRowKey { - private final String clusterId; - private final String userId; - private final String flowName; - private final Long flowRunId; - private final String appId; - private final ApplicationRowKeyConverter appRowKeyConverter = - new ApplicationRowKeyConverter(); - - public ApplicationRowKey(String clusterId, String userId, String flowName, - Long flowRunId, String appId) { - this.clusterId = clusterId; - this.userId = userId; - this.flowName = flowName; - this.flowRunId = flowRunId; - this.appId = appId; - } - - public String getClusterId() { - return clusterId; - } - - public String getUserId() { - return userId; - } - - public String getFlowName() { - return flowName; - } - - public Long getFlowRunId() { - return flowRunId; - } - - public String getAppId() { - return appId; - } - - /** - * Constructs a row key for the application table as follows: - * {@code clusterId!userName!flowName!flowRunId!AppId}. - * - * @return byte array with the row key - */ - public byte[] getRowKey() { - return appRowKeyConverter.encode(this); - } - - /** - * Given the raw row key as bytes, returns the row key as an object. - * - * @param rowKey Byte representation of row key. - * @return An <cite>ApplicationRowKey</cite> object. - */ - public static ApplicationRowKey parseRowKey(byte[] rowKey) { - return new ApplicationRowKeyConverter().decode(rowKey); - } - - /** - * Constructs a row key for the application table as follows: - * {@code clusterId!userName!flowName!flowRunId!AppId}. - * @return String representation of row key. - */ - public String getRowKeyAsString() { - return appRowKeyConverter.encodeAsString(this); - } - - /** - * Given the encoded row key as string, returns the row key as an object. - * @param encodedRowKey String representation of row key. - * @return A <cite>ApplicationRowKey</cite> object. - */ - public static ApplicationRowKey parseRowKeyFromString(String encodedRowKey) { - return new ApplicationRowKeyConverter().decodeFromString(encodedRowKey); - } - - /** - * Encodes and decodes row key for application table. The row key is of the - * form: clusterId!userName!flowName!flowRunId!appId. flowRunId is a long, - * appId is encoded and decoded using {@link AppIdKeyConverter} and rest are - * strings. - * <p> - */ - final private static class ApplicationRowKeyConverter implements - KeyConverter<ApplicationRowKey>, KeyConverterToString<ApplicationRowKey> { - - private final KeyConverter<String> appIDKeyConverter = - new AppIdKeyConverter(); - - /** - * Intended for use in ApplicationRowKey only. - */ - private ApplicationRowKeyConverter() { - } - - /** - * Application row key is of the form - * clusterId!userName!flowName!flowRunId!appId with each segment separated - * by !. The sizes below indicate sizes of each one of these segements in - * sequence. clusterId, userName and flowName are strings. flowrunId is a - * long hence 8 bytes in size. app id is represented as 12 bytes with - * cluster timestamp part of appid takes 8 bytes(long) and seq id takes 4 - * bytes(int). Strings are variable in size (i.e. end whenever separator is - * encountered). This is used while decoding and helps in determining where - * to split. - */ - private static final int[] SEGMENT_SIZES = {Separator.VARIABLE_SIZE, - Separator.VARIABLE_SIZE, Separator.VARIABLE_SIZE, Bytes.SIZEOF_LONG, - AppIdKeyConverter.getKeySize() }; - - /* - * (non-Javadoc) - * - * Encodes ApplicationRowKey object into a byte array with each - * component/field in ApplicationRowKey separated by Separator#QUALIFIERS. - * This leads to an application table row key of the form - * clusterId!userName!flowName!flowRunId!appId If flowRunId in passed - * ApplicationRowKey object is null (and the fields preceding it i.e. - * clusterId, userId and flowName are not null), this returns a row key - * prefix of the form clusterId!userName!flowName! and if appId in - * ApplicationRowKey is null (other 4 components all are not null), this - * returns a row key prefix of the form - * clusterId!userName!flowName!flowRunId! flowRunId is inverted while - * encoding as it helps maintain a descending order for row keys in the - * application table. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#encode(java.lang.Object) - */ - @Override - public byte[] encode(ApplicationRowKey rowKey) { - byte[] cluster = - Separator.encode(rowKey.getClusterId(), Separator.SPACE, - Separator.TAB, Separator.QUALIFIERS); - byte[] user = - Separator.encode(rowKey.getUserId(), Separator.SPACE, Separator.TAB, - Separator.QUALIFIERS); - byte[] flow = - Separator.encode(rowKey.getFlowName(), Separator.SPACE, - Separator.TAB, Separator.QUALIFIERS); - byte[] first = Separator.QUALIFIERS.join(cluster, user, flow); - // Note that flowRunId is a long, so we can't encode them all at the same - // time. - if (rowKey.getFlowRunId() == null) { - return Separator.QUALIFIERS.join(first, Separator.EMPTY_BYTES); - } - byte[] second = - Bytes.toBytes(LongConverter.invertLong( - rowKey.getFlowRunId())); - if (rowKey.getAppId() == null || rowKey.getAppId().isEmpty()) { - return Separator.QUALIFIERS.join(first, second, Separator.EMPTY_BYTES); - } - byte[] third = appIDKeyConverter.encode(rowKey.getAppId()); - return Separator.QUALIFIERS.join(first, second, third); - } - - /* - * (non-Javadoc) - * - * Decodes an application row key of the form - * clusterId!userName!flowName!flowRunId!appId represented in byte format - * and converts it into an ApplicationRowKey object.flowRunId is inverted - * while decoding as it was inverted while encoding. - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.common - * .KeyConverter#decode(byte[]) - */ - @Override - public ApplicationRowKey decode(byte[] rowKey) { - byte[][] rowKeyComponents = - Separator.QUALIFIERS.split(rowKey, SEGMENT_SIZES); - if (rowKeyComponents.length != 5) { - throw new IllegalArgumentException("the row key is not valid for " - + "an application"); - } - String clusterId = - Separator.decode(Bytes.toString(rowKeyComponents[0]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String userId = - Separator.decode(Bytes.toString(rowKeyComponents[1]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - String flowName = - Separator.decode(Bytes.toString(rowKeyComponents[2]), - Separator.QUALIFIERS, Separator.TAB, Separator.SPACE); - Long flowRunId = - LongConverter.invertLong(Bytes.toLong(rowKeyComponents[3])); - String appId = appIDKeyConverter.decode(rowKeyComponents[4]); - return new ApplicationRowKey(clusterId, userId, flowName, flowRunId, - appId); - } - - @Override - public String encodeAsString(ApplicationRowKey key) { - if (key.clusterId == null || key.userId == null || key.flowName == null - || key.flowRunId == null || key.appId == null) { - throw new IllegalArgumentException(); - } - return TimelineReaderUtils - .joinAndEscapeStrings(new String[] {key.clusterId, key.userId, - key.flowName, key.flowRunId.toString(), key.appId}); - } - - @Override - public ApplicationRowKey decodeFromString(String encodedRowKey) { - List<String> split = TimelineReaderUtils.split(encodedRowKey); - if (split == null || split.size() != 5) { - throw new IllegalArgumentException( - "Invalid row key for application table."); - } - Long flowRunId = Long.valueOf(split.get(3)); - return new ApplicationRowKey(split.get(0), split.get(1), split.get(2), - flowRunId, split.get(4)); - } - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java deleted file mode 100644 index f61b0e9..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationRowKeyPrefix.java +++ /dev/null @@ -1,69 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import org.apache.hadoop.yarn.server.timelineservice.storage.common.RowKeyPrefix; - -/** - * Represents a partial rowkey (without flowName or without flowName and - * flowRunId) for the application table. - */ -public class ApplicationRowKeyPrefix extends ApplicationRowKey implements - RowKeyPrefix<ApplicationRowKey> { - - /** - * Creates a prefix which generates the following rowKeyPrefixes for the - * application table: {@code clusterId!userName!flowName!}. - * - * @param clusterId the cluster on which applications ran - * @param userId the user that ran applications - * @param flowName the name of the flow that was run by the user on the - * cluster - */ - public ApplicationRowKeyPrefix(String clusterId, String userId, - String flowName) { - super(clusterId, userId, flowName, null, null); - } - - /** - * Creates a prefix which generates the following rowKeyPrefixes for the - * application table: {@code clusterId!userName!flowName!flowRunId!}. - * - * @param clusterId identifying the cluster - * @param userId identifying the user - * @param flowName identifying the flow - * @param flowRunId identifying the instance of this flow - */ - public ApplicationRowKeyPrefix(String clusterId, String userId, - String flowName, Long flowRunId) { - super(clusterId, userId, flowName, flowRunId, null); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.application. - * RowKeyPrefix#getRowKeyPrefix() - */ - @Override - public byte[] getRowKeyPrefix() { - return super.getRowKey(); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java deleted file mode 100644 index 4da720e..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/ApplicationTable.java +++ /dev/null @@ -1,170 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import java.io.IOException; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.regionserver.BloomType; -import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.BaseTable; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineHBaseSchemaConstants; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * The application table as column families info, config and metrics. Info - * stores information about a YARN application entity, config stores - * configuration data of a YARN application, metrics stores the metrics of a - * YARN application. This table is entirely analogous to the entity table but - * created for better performance. - * - * Example application table record: - * - * <pre> - * |-------------------------------------------------------------------------| - * | Row | Column Family | Column Family| Column Family| - * | key | info | metrics | config | - * |-------------------------------------------------------------------------| - * | clusterId! | id:appId | metricId1: | configKey1: | - * | userName! | | metricValue1 | configValue1 | - * | flowName! | created_time: | @timestamp1 | | - * | flowRunId! | 1392993084018 | | configKey2: | - * | AppId | | metriciD1: | configValue2 | - * | | i!infoKey: | metricValue2 | | - * | | infoValue | @timestamp2 | | - * | | | | | - * | | r!relatesToKey: | metricId2: | | - * | | id3=id4=id5 | metricValue1 | | - * | | | @timestamp2 | | - * | | s!isRelatedToKey: | | | - * | | id7=id9=id6 | | | - * | | | | | - * | | e!eventId=timestamp=infoKey: | | | - * | | eventInfoValue | | | - * | | | | | - * | | flowVersion: | | | - * | | versionValue | | | - * |-------------------------------------------------------------------------| - * </pre> - */ -public class ApplicationTable extends BaseTable<ApplicationTable> { - /** application prefix. */ - private static final String PREFIX = - YarnConfiguration.TIMELINE_SERVICE_PREFIX + "application"; - - /** config param name that specifies the application table name. */ - public static final String TABLE_NAME_CONF_NAME = PREFIX + ".table.name"; - - /** - * config param name that specifies the TTL for metrics column family in - * application table. - */ - private static final String METRICS_TTL_CONF_NAME = PREFIX - + ".table.metrics.ttl"; - - /** - * config param name that specifies max-versions for metrics column family in - * entity table. - */ - private static final String METRICS_MAX_VERSIONS = - PREFIX + ".table.metrics.max-versions"; - - /** default value for application table name. */ - private static final String DEFAULT_TABLE_NAME = - "timelineservice.application"; - - /** default TTL is 30 days for metrics timeseries. */ - private static final int DEFAULT_METRICS_TTL = 2592000; - - /** default max number of versions. */ - private static final int DEFAULT_METRICS_MAX_VERSIONS = 10000; - - private static final Logger LOG = - LoggerFactory.getLogger(ApplicationTable.class); - - public ApplicationTable() { - super(TABLE_NAME_CONF_NAME, DEFAULT_TABLE_NAME); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.hadoop.yarn.server.timelineservice.storage.BaseTable#createTable - * (org.apache.hadoop.hbase.client.Admin, - * org.apache.hadoop.conf.Configuration) - */ - public void createTable(Admin admin, Configuration hbaseConf) - throws IOException { - - TableName table = getTableName(hbaseConf); - if (admin.tableExists(table)) { - // do not disable / delete existing table - // similar to the approach taken by map-reduce jobs when - // output directory exists - throw new IOException("Table " + table.getNameAsString() - + " already exists."); - } - - HTableDescriptor applicationTableDescp = new HTableDescriptor(table); - HColumnDescriptor infoCF = - new HColumnDescriptor(ApplicationColumnFamily.INFO.getBytes()); - infoCF.setBloomFilterType(BloomType.ROWCOL); - applicationTableDescp.addFamily(infoCF); - - HColumnDescriptor configCF = - new HColumnDescriptor(ApplicationColumnFamily.CONFIGS.getBytes()); - configCF.setBloomFilterType(BloomType.ROWCOL); - configCF.setBlockCacheEnabled(true); - applicationTableDescp.addFamily(configCF); - - HColumnDescriptor metricsCF = - new HColumnDescriptor(ApplicationColumnFamily.METRICS.getBytes()); - applicationTableDescp.addFamily(metricsCF); - metricsCF.setBlockCacheEnabled(true); - // always keep 1 version (the latest) - metricsCF.setMinVersions(1); - metricsCF.setMaxVersions( - hbaseConf.getInt(METRICS_MAX_VERSIONS, DEFAULT_METRICS_MAX_VERSIONS)); - metricsCF.setTimeToLive(hbaseConf.getInt(METRICS_TTL_CONF_NAME, - DEFAULT_METRICS_TTL)); - applicationTableDescp.setRegionSplitPolicyClassName( - "org.apache.hadoop.hbase.regionserver.KeyPrefixRegionSplitPolicy"); - applicationTableDescp.setValue("KeyPrefixRegionSplitPolicy.prefix_length", - TimelineHBaseSchemaConstants.USERNAME_SPLIT_KEY_PREFIX_LENGTH); - admin.createTable(applicationTableDescp, - TimelineHBaseSchemaConstants.getUsernameSplits()); - LOG.info("Status of table creation for " + table.getNameAsString() + "=" - + admin.tableExists(table)); - } - - /** - * @param metricsTTL time to live parameter for the metrics in this table. - * @param hbaseConf configuration in which to set the metrics TTL config - * variable. - */ - public void setMetricsTTL(int metricsTTL, Configuration hbaseConf) { - hbaseConf.setInt(METRICS_TTL_CONF_NAME, metricsTTL); - } - -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java deleted file mode 100644 index 03f508f..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/application/package-info.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * Package org.apache.hadoop.yarn.server.timelineservice.storage.application - * contains classes related to implementation for application table. - */ [email protected] [email protected] -package org.apache.hadoop.yarn.server.timelineservice.storage.application; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; http://git-wip-us.apache.org/repos/asf/hadoop/blob/9af30d46/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java deleted file mode 100644 index 67497fc..0000000 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/apptoflow/AppToFlowColumn.java +++ /dev/null @@ -1,101 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.yarn.server.timelineservice.storage.apptoflow; - - -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Column; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnFamily; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnHelper; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.Separator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.TypedBufferedMutator; -import org.apache.hadoop.yarn.server.timelineservice.storage.common.ValueConverter; -import org.apache.hadoop.yarn.server.timelineservice.storage.flow.Attribute; - -import java.io.IOException; - -/** - * Identifies fully qualified columns for the {@link AppToFlowTable}. - */ -public enum AppToFlowColumn implements Column<AppToFlowTable> { - - /** - * The flow ID. - */ - FLOW_ID(AppToFlowColumnFamily.MAPPING, "flow_id"), - - /** - * The flow run ID. - */ - FLOW_RUN_ID(AppToFlowColumnFamily.MAPPING, "flow_run_id"), - - /** - * The user. - */ - USER_ID(AppToFlowColumnFamily.MAPPING, "user_id"); - - private final ColumnHelper<AppToFlowTable> column; - private final ColumnFamily<AppToFlowTable> columnFamily; - private final String columnQualifier; - private final byte[] columnQualifierBytes; - - AppToFlowColumn(ColumnFamily<AppToFlowTable> columnFamily, - String columnQualifier) { - this.columnFamily = columnFamily; - this.columnQualifier = columnQualifier; - // Future-proof by ensuring the right column prefix hygiene. - this.columnQualifierBytes = - Bytes.toBytes(Separator.SPACE.encode(columnQualifier)); - this.column = new ColumnHelper<AppToFlowTable>(columnFamily); - } - - /** - * @return the column name value - */ - private String getColumnQualifier() { - return columnQualifier; - } - - @Override - public byte[] getColumnQualifierBytes() { - return columnQualifierBytes.clone(); - } - - public void store(byte[] rowKey, - TypedBufferedMutator<AppToFlowTable> tableMutator, Long timestamp, - Object inputValue, Attribute... attributes) throws IOException { - column.store(rowKey, tableMutator, columnQualifierBytes, timestamp, - inputValue, attributes); - } - - @Override - public byte[] getColumnFamilyBytes() { - return columnFamily.getBytes(); - } - - @Override - public ValueConverter getValueConverter() { - return column.getValueConverter(); - } - - public Object readResult(Result result) throws IOException { - return column.readResult(result, columnQualifierBytes); - } - -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
