http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForNumericFilters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForNumericFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForNumericFilters.java new file mode 100644 index 0000000..7c14a9f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForNumericFilters.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; +import org.apache.hadoop.yarn.server.timelineservice.storage.common.TimelineStorageUtils; + +/** + * Used for parsing numerical filters such as metric filters. + */ +@Private +@Unstable +class TimelineParserForNumericFilters extends TimelineParserForCompareExpr { + + public TimelineParserForNumericFilters(String expression) { + super(expression, "Metric Filter"); + } + + protected TimelineFilter createFilter() { + return new TimelineCompareFilter(); + } + + @Override + protected void setCompareOpToCurrentFilter(TimelineCompareOp compareOp, + boolean keyMustExistFlag) { + ((TimelineCompareFilter)getCurrentFilter()).setCompareOp( + compareOp, keyMustExistFlag); + } + + protected Object parseValue(String strValue) throws TimelineParseException { + Object value = null; + try { + value = GenericObjectMapper.OBJECT_READER.readValue(strValue); + } catch (IOException e) { + throw new TimelineParseException("Value cannot be parsed."); + } + if (value == null || !(TimelineStorageUtils.isIntegralValue(value))) { + throw new TimelineParseException("Value is not a number."); + } + return value; + } + + protected void setValueToCurrentFilter(Object value) { + TimelineFilter currentFilter = getCurrentFilter(); + if (currentFilter != null) { + ((TimelineCompareFilter)currentFilter).setValue(value); + } + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForRelationFilters.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForRelationFilters.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForRelationFilters.java new file mode 100644 index 0000000..cde11e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineParserForRelationFilters.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineCompareOp; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineFilter; +import org.apache.hadoop.yarn.server.timelineservice.reader.filter.TimelineKeyValuesFilter; + +/** + * Used for parsing relation filters. + */ +@Private +@Unstable +class TimelineParserForRelationFilters extends + TimelineParserForEqualityExpr { + private final String valueDelimiter; + public TimelineParserForRelationFilters(String expression, char valuesDelim, + String valueDelim) { + super(expression, "Relation Filter", valuesDelim); + valueDelimiter = valueDelim; + } + + @Override + protected TimelineFilter createFilter() { + return new TimelineKeyValuesFilter(); + } + + @Override + protected void setCompareOpToCurrentFilter(TimelineCompareOp compareOp) { + ((TimelineKeyValuesFilter)getCurrentFilter()).setCompareOp(compareOp); + } + + @Override + protected void setValueToCurrentFilter(String value) + throws TimelineParseException { + if (value != null) { + String[] pairStrs = value.split(valueDelimiter); + if (pairStrs.length < 2) { + throw new TimelineParseException("Invalid relation filter expression"); + } + String key = pairStrs[0].trim(); + Set<Object> values = new HashSet<Object>(); + for (int i = 1; i < pairStrs.length; i++) { + values.add(pairStrs[i].trim()); + } + ((TimelineKeyValuesFilter)getCurrentFilter()). + setKeyAndValues(key, values); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java new file mode 100644 index 0000000..633bb23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderContext.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.server.timelineservice.TimelineContext; + +/** + * Encapsulates fields necessary to make a query in timeline reader. + */ +@Private +@Unstable +public class TimelineReaderContext extends TimelineContext { + + private String entityType; + private String entityId; + public TimelineReaderContext(String clusterId, String userId, String flowName, + Long flowRunId, String appId, String entityType, String entityId) { + super(clusterId, userId, flowName, flowRunId, appId); + this.entityType = entityType; + this.entityId = entityId; + } + + public TimelineReaderContext(TimelineReaderContext other) { + this(other.getClusterId(), other.getUserId(), other.getFlowName(), + other.getFlowRunId(), other.getAppId(), other.getEntityType(), + other.getEntityId()); + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + ((entityId == null) ? 0 : entityId.hashCode()); + result = + prime * result + ((entityType == null) ? 0 : entityType.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + TimelineReaderContext other = (TimelineReaderContext) obj; + if (entityId == null) { + if (other.entityId != null) { + return false; + } + } else if (!entityId.equals(other.entityId)) { + return false; + } + if (entityType == null) { + if (other.entityType != null) { + return false; + } + } else if (!entityType.equals(other.entityType)) { + return false; + } + return true; + } + + public String getEntityType() { + return entityType; + } + + public void setEntityType(String type) { + this.entityType = type; + } + + public String getEntityId() { + return entityId; + } + + public void setEntityId(String id) { + this.entityId = id; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java new file mode 100644 index 0000000..4cff3bc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderManager.java @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.io.IOException; +import java.util.Set; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; + +import com.google.common.annotations.VisibleForTesting; + +/** + * This class wraps over the timeline reader store implementation. It does some + * non trivial manipulation of the timeline data before or after getting + * it from the backend store. + */ +@Private +@Unstable +public class TimelineReaderManager extends AbstractService { + + @VisibleForTesting + public static final String UID_KEY = "UID"; + private TimelineReader reader; + + public TimelineReaderManager(TimelineReader timelineReader) { + super(TimelineReaderManager.class.getName()); + this.reader = timelineReader; + } + + /** + * Gets cluster ID from config yarn.resourcemanager.cluster-id + * if not supplied by client. + * @param clusterId + * @param conf + * @return clusterId + */ + private static String getClusterID(String clusterId, Configuration conf) { + if (clusterId == null || clusterId.isEmpty()) { + return conf.get( + YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + } + return clusterId; + } + + private static TimelineEntityType getTimelineEntityType(String entityType) { + if (entityType == null) { + return null; + } + try { + return TimelineEntityType.valueOf(entityType); + } catch (IllegalArgumentException e) { + return null; + } + } + + /** + * Fill UID in the info field of entity based on the query(identified by + * entity type). + * @param entityType Entity type of query. + * @param entity Timeline Entity. + * @param context Context defining the query. + */ + private static void fillUID(TimelineEntityType entityType, + TimelineEntity entity, TimelineReaderContext context) { + if (entityType != null) { + switch(entityType) { + case YARN_FLOW_ACTIVITY: + FlowActivityEntity activityEntity = (FlowActivityEntity)entity; + context.setUserId(activityEntity.getUser()); + context.setFlowName(activityEntity.getFlowName()); + entity.setUID(UID_KEY, + TimelineUIDConverter.FLOW_UID.encodeUID(context)); + return; + case YARN_FLOW_RUN: + FlowRunEntity runEntity = (FlowRunEntity)entity; + context.setFlowRunId(runEntity.getRunId()); + entity.setUID(UID_KEY, + TimelineUIDConverter.FLOWRUN_UID.encodeUID(context)); + return; + case YARN_APPLICATION: + context.setAppId(entity.getId()); + entity.setUID(UID_KEY, + TimelineUIDConverter.APPLICATION_UID.encodeUID(context)); + return; + default: + break; + } + } + context.setEntityType(entity.getType()); + context.setEntityId(entity.getId()); + entity.setUID(UID_KEY, + TimelineUIDConverter.GENERIC_ENTITY_UID.encodeUID(context)); + } + + /** + * Get a set of entities matching given predicates by making a call to + * backend storage implementation. The meaning of each argument has been + * documented in detail with {@link TimelineReader#getEntities}.If cluster ID + * has not been supplied by the client, fills the cluster id from config + * before making a call to backend storage. After fetching entities from + * backend, fills the appropriate UID based on entity type for each entity. + * + * @param context Timeline context within the scope of which entities have to + * be fetched. + * @param filters Filters which limit the number of entities to be returned. + * @param dataToRetrieve Data to carry in each entity fetched. + * @return a set of <cite>TimelineEntity</cite> objects. + * @throws IOException if any problem occurs while getting entities. + * @see TimelineReader#getEntities + */ + public Set<TimelineEntity> getEntities(TimelineReaderContext context, + TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) + throws IOException { + context.setClusterId(getClusterID(context.getClusterId(), getConfig())); + Set<TimelineEntity> entities = reader.getEntities( + new TimelineReaderContext(context), filters, dataToRetrieve); + if (entities != null) { + TimelineEntityType type = getTimelineEntityType(context.getEntityType()); + for (TimelineEntity entity : entities) { + fillUID(type, entity, context); + } + } + return entities; + } + + /** + * Get single timeline entity by making a call to backend storage + * implementation. The meaning of each argument in detail has been + * documented with {@link TimelineReader#getEntity}. If cluster ID has not + * been supplied by the client, fills the cluster id from config before making + * a call to backend storage. After fetching entity from backend, fills the + * appropriate UID based on entity type. + * + * @param context Timeline context within the scope of which entity has to be + * fetched. + * @param dataToRetrieve Data to carry in the entity fetched. + * @return A <cite>TimelineEntity</cite> object if found, null otherwise. + * @throws IOException if any problem occurs while getting entity. + * @see TimelineReader#getEntity + */ + public TimelineEntity getEntity(TimelineReaderContext context, + TimelineDataToRetrieve dataToRetrieve) throws IOException { + context.setClusterId( + getClusterID(context.getClusterId(), getConfig())); + TimelineEntity entity = reader.getEntity( + new TimelineReaderContext(context), dataToRetrieve); + if (entity != null) { + TimelineEntityType type = getTimelineEntityType(context.getEntityType()); + fillUID(type, entity, context); + } + return entity; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java new file mode 100644 index 0000000..110d1dc --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderServer.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import static org.apache.hadoop.fs.CommonConfigurationKeys.DEFAULT_HADOOP_HTTP_STATIC_USER; +import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_HTTP_STATIC_USER; + +import java.net.URI; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.http.HttpServer2; +import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.util.ExitUtil; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.ShutdownHookManager; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; +import org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl; +import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader; +import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; +import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider; +import org.apache.hadoop.yarn.webapp.util.WebAppUtils; + +import com.google.common.annotations.VisibleForTesting; + +/** Main class for Timeline Reader. */ +@Private +@Unstable +public class TimelineReaderServer extends CompositeService { + private static final Log LOG = LogFactory.getLog(TimelineReaderServer.class); + private static final int SHUTDOWN_HOOK_PRIORITY = 30; + static final String TIMELINE_READER_MANAGER_ATTR = + "timeline.reader.manager"; + + private HttpServer2 readerWebServer; + private TimelineReaderManager timelineReaderManager; + + public TimelineReaderServer() { + super(TimelineReaderServer.class.getName()); + } + + @Override + protected void serviceInit(Configuration conf) throws Exception { + if (!YarnConfiguration.timelineServiceV2Enabled(conf)) { + throw new YarnException("timeline service v.2 is not enabled"); + } + + TimelineReader timelineReaderStore = createTimelineReaderStore(conf); + addService(timelineReaderStore); + timelineReaderManager = createTimelineReaderManager(timelineReaderStore); + addService(timelineReaderManager); + super.serviceInit(conf); + } + + private TimelineReader createTimelineReaderStore(Configuration conf) { + TimelineReader readerStore = ReflectionUtils.newInstance(conf.getClass( + YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + HBaseTimelineReaderImpl.class, TimelineReader.class), conf); + LOG.info("Using store " + readerStore.getClass().getName()); + readerStore.init(conf); + return readerStore; + } + + private TimelineReaderManager createTimelineReaderManager( + TimelineReader timelineReaderStore) { + return new TimelineReaderManager(timelineReaderStore); + } + + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + startTimelineReaderWebApp(); + } + + @Override + protected void serviceStop() throws Exception { + if (readerWebServer != null) { + readerWebServer.stop(); + } + super.serviceStop(); + } + + private void startTimelineReaderWebApp() { + Configuration conf = getConfig(); + String bindAddress = WebAppUtils.getWebAppBindURL(conf, + YarnConfiguration.TIMELINE_SERVICE_BIND_HOST, + WebAppUtils.getTimelineReaderWebAppURL(conf)); + LOG.info("Instantiating TimelineReaderWebApp at " + bindAddress); + try { + HttpServer2.Builder builder = new HttpServer2.Builder() + .setName("timeline") + .setConf(conf) + .addEndpoint(URI.create("http://" + bindAddress)); + readerWebServer = builder.build(); + + Map<String, String> options = new HashMap<>(); + String username = conf.get(HADOOP_HTTP_STATIC_USER, + DEFAULT_HADOOP_HTTP_STATIC_USER); + options.put(HADOOP_HTTP_STATIC_USER, username); + HttpServer2.defineFilter(readerWebServer.getWebAppContext(), + "static_user_filter_timeline", + StaticUserWebFilter.StaticUserFilter.class.getName(), + options, new String[] {"/*"}); + + readerWebServer.addJerseyResourcePackage( + TimelineReaderWebServices.class.getPackage().getName() + ";" + + GenericExceptionHandler.class.getPackage().getName() + ";" + + YarnJacksonJaxbJsonProvider.class.getPackage().getName(), + "/*"); + readerWebServer.setAttribute(TIMELINE_READER_MANAGER_ATTR, + timelineReaderManager); + readerWebServer.start(); + } catch (Exception e) { + String msg = "TimelineReaderWebApp failed to start."; + LOG.error(msg, e); + throw new YarnRuntimeException(msg, e); + } + } + + @VisibleForTesting + int getWebServerPort() { + return readerWebServer.getConnectorAddress(0).getPort(); + } + + static TimelineReaderServer startTimelineReaderServer(String[] args, + Configuration conf) { + Thread.setDefaultUncaughtExceptionHandler( + new YarnUncaughtExceptionHandler()); + StringUtils.startupShutdownMessage(TimelineReaderServer.class, + args, LOG); + TimelineReaderServer timelineReaderServer = null; + try { + timelineReaderServer = new TimelineReaderServer(); + ShutdownHookManager.get().addShutdownHook( + new CompositeServiceShutdownHook(timelineReaderServer), + SHUTDOWN_HOOK_PRIORITY); + timelineReaderServer.init(conf); + timelineReaderServer.start(); + } catch (Throwable t) { + LOG.fatal("Error starting TimelineReaderWebServer", t); + ExitUtil.terminate(-1, "Error starting TimelineReaderWebServer"); + } + return timelineReaderServer; + } + + public static void main(String[] args) { + Configuration conf = new YarnConfiguration(); + conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + startTimelineReaderServer(args, conf); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bd32c28b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java new file mode 100644 index 0000000..c93c631 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/reader/TimelineReaderUtils.java @@ -0,0 +1,171 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.timelineservice.reader; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.lang.StringUtils; + +/** + * Set of utility methods to be used across timeline reader. + */ +final class TimelineReaderUtils { + private TimelineReaderUtils() { + } + + /** + * Split the passed string along the passed delimiter character while looking + * for escape char to interpret the splitted parts correctly. For delimiter or + * escape character to be interpreted as part of the string, they have to be + * escaped by putting an escape character in front. + * @param str string to be split. + * @param delimiterChar delimiter used for splitting. + * @param escapeChar delimiter and escape character will be escaped using this + * character. + * @return a list of strings after split. + * @throws IllegalArgumentException if string is not properly escaped. + */ + static List<String> split(final String str, final char delimiterChar, + final char escapeChar) throws IllegalArgumentException { + if (str == null) { + return null; + } + int len = str.length(); + if (len == 0) { + return Collections.emptyList(); + } + List<String> list = new ArrayList<String>(); + // Keeps track of offset of the passed string. + int offset = 0; + // Indicates start offset from which characters will be copied from original + // string to destination string. Resets when an escape or delimiter char is + // encountered. + int startOffset = 0; + StringBuilder builder = new StringBuilder(len); + // Iterate over the string till we reach the end. + while (offset < len) { + if (str.charAt(offset) == escapeChar) { + // An escape character must be followed by a delimiter or escape char + // but we have reached the end and have no further character to look at. + if (offset + 1 >= len) { + throw new IllegalArgumentException( + "Escape char not properly escaped."); + } + char nextChar = str.charAt(offset + 1); + // Next character must be a delimiter or an escape char. + if (nextChar != escapeChar && nextChar != delimiterChar) { + throw new IllegalArgumentException( + "Escape char or delimiter char not properly escaped."); + } + // Copy contents from the offset where last escape or delimiter char was + // encountered. + if (startOffset < offset) { + builder.append(str.substring(startOffset, offset)); + } + builder.append(nextChar); + offset += 2; + // Reset the start offset as an escape char has been encountered. + startOffset = offset; + continue; + } else if (str.charAt(offset) == delimiterChar) { + // A delimiter has been encountered without an escape character. + // String needs to be split here. Copy remaining chars and add the + // string to list. + builder.append(str.substring(startOffset, offset)); + list.add(builder.toString().trim()); + // Reset the start offset as a delimiter has been encountered. + startOffset = ++offset; + builder = new StringBuilder(len - offset); + continue; + } + offset++; + } + // Copy rest of the characters. + if (!str.isEmpty()) { + builder.append(str.substring(startOffset)); + } + // Add the last part of delimited string to list. + list.add(builder.toString().trim()); + return list; + } + + private static String escapeString(final String str, final char delimiterChar, + final char escapeChar) { + if (str == null) { + return null; + } + int len = str.length(); + if (len == 0) { + return ""; + } + StringBuilder builder = new StringBuilder(); + // Keeps track of offset of the passed string. + int offset = 0; + // Indicates start offset from which characters will be copied from original + // string to destination string. Resets when an escape or delimiter char is + // encountered. + int startOffset = 0; + // Iterate over the string till we reach the end. + while (offset < len) { + char charAtOffset = str.charAt(offset); + if (charAtOffset == escapeChar || charAtOffset == delimiterChar) { + // If an escape or delimiter character is encountered, copy characters + // from the offset where escape or delimiter was last encountered. + if (startOffset < offset) { + builder.append(str.substring(startOffset, offset)); + } + // Append escape char before delimiter/escape char. + builder.append(escapeChar).append(charAtOffset); + // Reset start offset for copying characters when next escape/delimiter + // char is encountered. + startOffset = offset + 1; + } + offset++; + } + // Copy remaining characters. + builder.append(str.substring(startOffset)); + return builder.toString(); + } + + /** + * Join different strings in the passed string array delimited by passed + * delimiter with delimiter and escape character escaped using passed escape + * char. + * @param strs strings to be joined. + * @param delimiterChar delimiter used to join strings. + * @param escapeChar escape character used to escape delimiter and escape + * char. + * @return a single string joined using delimiter and properly escaped. + */ + static String joinAndEscapeStrings(final String[] strs, + final char delimiterChar, final char escapeChar) { + int len = strs.length; + // Escape each string in string array. + for (int index = 0; index < len; index++) { + if (strs[index] == null) { + return null; + } + strs[index] = escapeString(strs[index], delimiterChar, escapeChar); + } + // Join the strings after they have been escaped. + return StringUtils.join(strs, delimiterChar); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org