YARN-3134. Implemented Phoenix timeline writer to access HBase backend. 
Contributed by Li Lu.

(cherry picked from commit b3b791be466be79e4e964ad068f7a6ec701e22e1)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9b794ab1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9b794ab1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9b794ab1

Branch: refs/heads/YARN-2928
Commit: 9b794ab16dde75a088469f6393aab73f198e2a9a
Parents: ed95a79
Author: Zhijie Shen <zjs...@apache.org>
Authored: Fri May 8 19:08:02 2015 -0700
Committer: Sangjin Lee <sj...@apache.org>
Committed: Tue Aug 25 10:47:11 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../dev-support/findbugs-exclude.xml            |  11 +
 .../hadoop-yarn-server-timelineservice/pom.xml  |  17 +
 .../collector/TimelineCollector.java            |  13 +-
 .../collector/TimelineCollectorManager.java     |  19 +
 .../storage/PhoenixTimelineWriterImpl.java      | 509 +++++++++++++++++++
 .../storage/TestPhoenixTimelineWriterImpl.java  | 125 +++++
 .../storage/TestTimelineWriterImpl.java         |  74 +++
 8 files changed, 760 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 82f95d6..a290c22 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -64,6 +64,9 @@ Branch YARN-2928: Timeline Server Next Generation: Phase 1
     YARN-3562. unit tests failures and issues found from findbug from earlier
     ATS checkins (Naganarasimha G R via sjlee)
 
+    YARN-3134. Implemented Phoenix timeline writer to access HBase backend. (Li
+    Lu via zjshen)
+
   IMPROVEMENTS
 
   OPTIMIZATIONS

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml 
b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 114851f..d25d1d9 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -485,6 +485,17 @@
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
+  <!-- Ignore SQL_PREPARED_STATEMENT_GENERATED_FROM_NONCONSTANT_STRING 
warnings for Timeline Phoenix storage. -->
+  <!-- Since we're using dynamic columns, we have to generate SQL statements 
dynamically -->
+  <Match>
+    <Class 
name="org.apache.hadoop.yarn.server.timelineservice.storage.PhoenixTimelineWriterImpl"
 />
+    <Or>
+      <Method name="storeEntityVariableLengthFields" />
+      <Method name="storeEvents" />
+      <Method name="storeMetrics" />
+      <Method name="write" />
+    </Or>
+  </Match>
   
   <!-- Following fields are used in ErrorsAndWarningsBlock, which is not a 
part of analysis of findbugs -->
   <Match>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
index f974aee..f62230f 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/pom.xml
@@ -120,6 +120,23 @@
       <artifactId>mockito-all</artifactId>
       <scope>test</scope>
     </dependency>
+
+    <dependency>
+    <groupId>org.apache.phoenix</groupId>
+    <artifactId>phoenix-core</artifactId>
+    <version>4.3.0</version>
+    <exclusions>
+      <!-- Exclude jline from here -->
+      <exclusion>
+        <artifactId>jline</artifactId>
+        <groupId>jline</groupId>
+      </exclusion>
+    </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.google.guava</groupId>
+      <artifactId>guava</artifactId>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
index 4eced5b..bb7db12 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollector.java
@@ -27,11 +27,8 @@ import 
org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.service.CompositeService;
-import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
 import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
 import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 
 /**
@@ -55,11 +52,6 @@ public abstract class TimelineCollector extends 
CompositeService {
   @Override
   protected void serviceInit(Configuration conf) throws Exception {
     super.serviceInit(conf);
-    writer = ReflectionUtils.newInstance(conf.getClass(
-        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
-        FileSystemTimelineWriterImpl.class,
-        TimelineWriter.class), conf);
-    writer.init(conf);
   }
 
   @Override
@@ -70,11 +62,10 @@ public abstract class TimelineCollector extends 
CompositeService {
   @Override
   protected void serviceStop() throws Exception {
     super.serviceStop();
-    writer.stop();
   }
 
-  public TimelineWriter getWriter() {
-    return writer;
+  protected void setWriter(TimelineWriter w) {
+    this.writer = w;
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
index 7b3da6b..953d9b7 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/collector/TimelineCollectorManager.java
@@ -23,9 +23,14 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import 
org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
+import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
 
 import java.util.Collections;
 import java.util.HashMap;
@@ -42,6 +47,19 @@ public abstract class TimelineCollectorManager extends 
AbstractService {
   private static final Log LOG =
       LogFactory.getLog(TimelineCollectorManager.class);
 
+  protected TimelineWriter writer;
+
+  @Override
+  public void serviceInit(Configuration conf) throws Exception {
+    writer = ReflectionUtils.newInstance(conf.getClass(
+        YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
+        FileSystemTimelineWriterImpl.class,
+        TimelineWriter.class), conf);
+    writer.init(conf);
+    super.serviceInit(conf);
+  }
+
+
   // access to this map is synchronized with the map itself
   private final Map<ApplicationId, TimelineCollector> collectors =
       Collections.synchronizedMap(
@@ -69,6 +87,7 @@ public abstract class TimelineCollectorManager extends 
AbstractService {
           // initialize, start, and add it to the collection so it can be
           // cleaned up when the parent shuts down
           collector.init(getConfig());
+          collector.setWriter(writer);
           collector.start();
           collectors.put(appId, collector);
           LOG.info("the collector for " + appId + " was added");

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..af8a233
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/PhoenixTimelineWriterImpl.java
@@ -0,0 +1,509 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+import 
org.apache.hadoop.yarn.api.records.timelineservice.TimelineWriteResponse;
+import org.apache.hadoop.yarn.server.timeline.GenericObjectMapper;
+import 
org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.PreparedStatement;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+@Private
+@Unstable
+public class PhoenixTimelineWriterImpl extends AbstractService
+    implements TimelineWriter {
+
+  private static final Log LOG
+      = LogFactory.getLog(PhoenixTimelineWriterImpl.class);
+  private static final String PHOENIX_COL_FAMILY_PLACE_HOLDER
+      = "timeline_cf_placeholder";
+  // These lists are not taking effects in table creations.
+  private static final String[] PHOENIX_STORAGE_PK_LIST
+      = {"cluster", "user", "flow_name", "flow_version", "flow_run", "app_id",
+         "type", "entity_id"};
+  private static final String[] TIMELINE_EVENT_EXTRA_PK_LIST =
+      {"timestamp", "event_id"};
+  private static final String[] TIMELINE_METRIC_EXTRA_PK_LIST =
+      {"metric_id"};
+  /** Default Phoenix JDBC driver name */
+  private static final String DRIVER_CLASS_NAME
+      = "org.apache.phoenix.jdbc.PhoenixDriver";
+
+  /** Default Phoenix timeline entity table name */
+  @VisibleForTesting
+  static final String ENTITY_TABLE_NAME = "timeline_entity";
+  /** Default Phoenix event table name */
+  @VisibleForTesting
+  static final String EVENT_TABLE_NAME = "timeline_event";
+  /** Default Phoenix metric table name */
+  @VisibleForTesting
+  static final String METRIC_TABLE_NAME = "metric_singledata";
+
+  /** Default Phoenix timeline config column family */
+  private static final String CONFIG_COLUMN_FAMILY = "c.";
+  /** Default Phoenix timeline info column family */
+  private static final String INFO_COLUMN_FAMILY = "i.";
+  /** Default Phoenix event info column family */
+  private static final String EVENT_INFO_COLUMN_FAMILY = "ei.";
+  /** Default Phoenix isRelatedTo column family */
+  private static final String IS_RELATED_TO_FAMILY = "ir.";
+  /** Default Phoenix relatesTo column family */
+  private static final String RELATES_TO_FAMILY = "rt.";
+  /** Default separator for Phoenix storage */
+  private static final String PHOENIX_STORAGE_SEPARATOR = ";";
+
+  /** Connection string to the deployed Phoenix cluster */
+  static final String CONN_STRING = "jdbc:phoenix:localhost:2181:/hbase";
+
+  PhoenixTimelineWriterImpl() {
+    super((PhoenixTimelineWriterImpl.class.getName()));
+  }
+
+  @Override
+  protected void serviceInit(Configuration conf) throws Exception {
+    createTables();
+    super.init(conf);
+  }
+
+  @Override
+  protected void serviceStop() throws Exception {
+    super.serviceStop();
+  }
+
+  @Override
+  public TimelineWriteResponse write(String clusterId, String userId,
+      String flowName, String flowVersion, long flowRunId, String appId,
+      TimelineEntities entities) throws IOException {
+    TimelineWriteResponse response = new TimelineWriteResponse();
+    TimelineCollectorContext currContext = new TimelineCollectorContext(
+        clusterId, userId, flowName, flowVersion, flowRunId, appId);
+    String sql = "UPSERT INTO " + ENTITY_TABLE_NAME
+        + " (" + StringUtils.join(PHOENIX_STORAGE_PK_LIST, ",")
+        + ", creation_time, modified_time, configs) "
+        + "VALUES (" + StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length)
+        + "?, ?, ?)";
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("TimelineEntity write SQL: " + sql);
+    }
+
+    try (Connection conn = getConnection();
+        PreparedStatement ps = conn.prepareStatement(sql)) {
+      for (TimelineEntity entity : entities.getEntities()) {
+        int idx = setStringsForPrimaryKey(ps, currContext, entity, 1);
+        ps.setLong(idx++, entity.getCreatedTime());
+        ps.setLong(idx++, entity.getModifiedTime());
+        String configKeys = StringUtils.join(
+            entity.getConfigs().keySet(), PHOENIX_STORAGE_SEPARATOR);
+        ps.setString(idx++, configKeys);
+        ps.execute();
+
+        storeEntityVariableLengthFields(entity, currContext, conn);
+        storeEvents(entity, currContext, conn);
+        storeMetrics(entity, currContext, conn);
+
+        conn.commit();
+      }
+    } catch (SQLException se) {
+      LOG.error("Failed to add entity to Phoenix " + se.getMessage());
+      throw new IOException(se);
+    } catch (Exception e) {
+      LOG.error("Exception on getting connection: " + e.getMessage());
+      throw new IOException(e);
+    }
+    return response;
+  }
+
+  /**
+   * Aggregates the entity information to the timeline store based on which
+   * track this entity is to be rolled up to The tracks along which 
aggregations
+   * are to be done are given by {@link TimelineAggregationTrack}
+   *
+   * Any errors occurring for individual write request objects will be reported
+   * in the response.
+   *
+   * @param data
+   *          a {@link TimelineEntity} object
+   *          a {@link TimelineAggregationTrack} enum value
+   * @return a {@link TimelineWriteResponse} object.
+   * @throws IOException
+   */
+  @Override
+  public TimelineWriteResponse aggregate(TimelineEntity data,
+      TimelineAggregationTrack track) throws IOException {
+    return null;
+
+  }
+
+  // Utility functions
+  @Private
+  @VisibleForTesting
+  static Connection getConnection() throws IOException {
+    Connection conn;
+    try {
+      Class.forName(DRIVER_CLASS_NAME);
+      conn = DriverManager.getConnection(CONN_STRING);
+      conn.setAutoCommit(false);
+    } catch (SQLException se) {
+      LOG.error("Failed to connect to phoenix server! "
+          + se.getLocalizedMessage());
+      throw new IOException(se);
+    } catch (ClassNotFoundException e) {
+      LOG.error("Class not found! " + e.getLocalizedMessage());
+      throw new IOException(e);
+    }
+    return conn;
+  }
+
+  private void createTables() throws Exception {
+    // Create tables if necessary
+    try (Connection conn = getConnection();
+        Statement stmt = conn.createStatement()) {
+      // Table schema defined as in YARN-3134.
+      String sql = "CREATE TABLE IF NOT EXISTS " + ENTITY_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
+          + "flow_run UNSIGNED_LONG NOT NULL, "
+          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+          + "entity_id VARCHAR NOT NULL, "
+          + "creation_time UNSIGNED_LONG, modified_time UNSIGNED_LONG, "
+          + "configs VARCHAR, "
+          + CONFIG_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " 
VARCHAR, "
+          + INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " 
VARBINARY, "
+          + IS_RELATED_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " 
VARCHAR, "
+          + RELATES_TO_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " VARCHAR "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
+          + "type, entity_id))";
+      stmt.executeUpdate(sql);
+      sql = "CREATE TABLE IF NOT EXISTS " + EVENT_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
+          + "flow_run UNSIGNED_LONG NOT NULL, "
+          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+          + "entity_id VARCHAR NOT NULL, "
+          + "timestamp UNSIGNED_LONG NOT NULL, event_id VARCHAR NOT NULL, "
+          + EVENT_INFO_COLUMN_FAMILY + PHOENIX_COL_FAMILY_PLACE_HOLDER + " 
VARBINARY "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
+          + "type, entity_id, timestamp DESC, event_id))";
+      stmt.executeUpdate(sql);
+      sql = "CREATE TABLE IF NOT EXISTS " + METRIC_TABLE_NAME
+          + "(user VARCHAR NOT NULL, cluster VARCHAR NOT NULL, "
+          + "flow_name VARCHAR NOT NULL, flow_version VARCHAR NOT NULL, "
+          + "flow_run UNSIGNED_LONG NOT NULL, "
+          + "app_id VARCHAR NOT NULL, type VARCHAR NOT NULL, "
+          + "entity_id VARCHAR NOT NULL, "
+          + "metric_id VARCHAR NOT NULL, "
+          + "singledata VARBINARY, "
+          + "time UNSIGNED_LONG "
+          + "CONSTRAINT pk PRIMARY KEY("
+          + "user, cluster, flow_name, flow_version, flow_run DESC, app_id, "
+          + "type, entity_id, metric_id))";
+      stmt.executeUpdate(sql);
+      conn.commit();
+    } catch (SQLException se) {
+      LOG.error("Failed in init data " + se.getLocalizedMessage());
+      throw se;
+    }
+    return;
+  }
+
+  private static class DynamicColumns<K> {
+    static final String COLUMN_FAMILY_TYPE_BYTES = " VARBINARY";
+    static final String COLUMN_FAMILY_TYPE_STRING = " VARCHAR";
+    String columnFamilyPrefix;
+    String type;
+    Set<K> columns;
+
+    public DynamicColumns(String columnFamilyPrefix, String type,
+        Set<K> keyValues) {
+      this.columnFamilyPrefix = columnFamilyPrefix;
+      this.columns = keyValues;
+      this.type = type;
+    }
+  }
+
+  private static <K> StringBuilder appendColumnsSQL(
+      StringBuilder colNames, DynamicColumns<K> cfInfo) {
+    // Prepare the sql template by iterating through all keys
+    for (K key : cfInfo.columns) {
+      colNames.append(",").append(cfInfo.columnFamilyPrefix)
+          .append(key.toString()).append(cfInfo.type);
+    }
+    return colNames;
+  }
+
+  private static <K, V> int setValuesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos,
+      boolean converToBytes) throws SQLException {
+    int idx = startPos;
+    for (Map.Entry<K, V> entry : keyValues.entrySet()) {
+      V value = entry.getValue();
+      if (value instanceof Collection) {
+        ps.setString(idx++, StringUtils.join(
+            (Collection) value, PHOENIX_STORAGE_SEPARATOR));
+      } else {
+        if (converToBytes) {
+          try {
+            ps.setBytes(idx++, GenericObjectMapper.write(entry.getValue()));
+          } catch (IOException ie) {
+            LOG.error("Exception in converting values into bytes "
+                + ie.getMessage());
+            throw new SQLException(ie);
+          }
+        } else {
+          ps.setString(idx++, value.toString());
+        }
+      }
+    }
+    return idx;
+  }
+
+  private static <K, V> int setBytesForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, true);
+  }
+
+  private static <K, V> int setStringsForColumnFamily(
+      PreparedStatement ps, Map<K, V> keyValues, int startPos)
+      throws SQLException {
+    return setValuesForColumnFamily(ps, keyValues, startPos, false);
+  }
+
+  private static int setStringsForPrimaryKey(PreparedStatement ps,
+      TimelineCollectorContext context, TimelineEntity entity, int startPos)
+      throws SQLException {
+    int idx = startPos;
+    ps.setString(idx++, context.getClusterId());
+    ps.setString(idx++, context.getUserId());
+    ps.setString(idx++,
+        context.getFlowName());
+    ps.setString(idx++, context.getFlowVersion());
+    ps.setLong(idx++, context.getFlowRunId());
+    ps.setString(idx++, context.getAppId());
+    ps.setString(idx++, entity.getType());
+    ps.setString(idx++, entity.getId());
+    return idx;
+  }
+
+  private static void storeEntityVariableLengthFields(TimelineEntity entity,
+      TimelineCollectorContext context, Connection conn) throws SQLException {
+    int numPlaceholders = 0;
+    StringBuilder columnDefs = new StringBuilder(
+        StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+    if (entity.getConfigs() != null) {
+      Set<String> keySet = entity.getConfigs().keySet();
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          CONFIG_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
+          keySet));
+      numPlaceholders += keySet.size();
+    }
+    if (entity.getInfo() != null) {
+      Set<String> keySet = entity.getInfo().keySet();
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+          keySet));
+      numPlaceholders += keySet.size();
+    }
+    if (entity.getIsRelatedToEntities() != null) {
+      Set<String> keySet = entity.getIsRelatedToEntities().keySet();
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          IS_RELATED_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
+          keySet));
+      numPlaceholders += keySet.size();
+    }
+    if (entity.getRelatesToEntities() != null) {
+      Set<String> keySet = entity.getRelatesToEntities().keySet();
+      appendColumnsSQL(columnDefs, new DynamicColumns<>(
+          RELATES_TO_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_STRING,
+          keySet));
+      numPlaceholders += keySet.size();
+    }
+    if (numPlaceholders == 0) {
+      return;
+    }
+    StringBuilder placeholders = new StringBuilder();
+    placeholders.append(
+        StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length));
+    // numPlaceholders >= 1 now
+    placeholders.append("?")
+        .append(StringUtils.repeat(",?", numPlaceholders - 1));
+    String sqlVariableLengthFields = new StringBuilder("UPSERT INTO ")
+        .append(ENTITY_TABLE_NAME).append(" (").append(columnDefs)
+        .append(") VALUES(").append(placeholders).append(")").toString();
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("SQL statement for variable length fields: "
+          + sqlVariableLengthFields);
+    }
+    // Use try with resource statement for the prepared statement
+    try (PreparedStatement psVariableLengthFields =
+        conn.prepareStatement(sqlVariableLengthFields)) {
+      int idx = setStringsForPrimaryKey(
+          psVariableLengthFields, context, entity, 1);
+      if (entity.getConfigs() != null) {
+        idx = setStringsForColumnFamily(
+            psVariableLengthFields, entity.getConfigs(), idx);
+      }
+      if (entity.getInfo() != null) {
+        idx = setBytesForColumnFamily(
+            psVariableLengthFields, entity.getInfo(), idx);
+      }
+      if (entity.getIsRelatedToEntities() != null) {
+        idx = setStringsForColumnFamily(
+            psVariableLengthFields, entity.getIsRelatedToEntities(), idx);
+      }
+      if (entity.getRelatesToEntities() != null) {
+        idx = setStringsForColumnFamily(
+            psVariableLengthFields, entity.getRelatesToEntities(), idx);
+      }
+      psVariableLengthFields.execute();
+    }
+  }
+
+  private static void storeMetrics(TimelineEntity entity,
+      TimelineCollectorContext context, Connection conn) throws SQLException {
+    if (entity.getMetrics() == null) {
+      return;
+    }
+    Set<TimelineMetric> metrics = entity.getMetrics();
+    for (TimelineMetric metric : metrics) {
+      StringBuilder sqlColumns = new StringBuilder(
+          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+      sqlColumns.append(",")
+          .append(StringUtils.join(TIMELINE_METRIC_EXTRA_PK_LIST, ","));
+      sqlColumns.append(",").append("singledata, time");
+      StringBuilder placeholders = new StringBuilder();
+      placeholders.append(
+          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
+          .append(StringUtils.repeat("?,", 
TIMELINE_METRIC_EXTRA_PK_LIST.length));
+      placeholders.append("?, ?");
+      String sqlMetric = new StringBuilder("UPSERT INTO ")
+          .append(METRIC_TABLE_NAME).append(" (").append(sqlColumns)
+          .append(") VALUES(").append(placeholders).append(")").toString();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SQL statement for metric: " + sqlMetric);
+      }
+      try (PreparedStatement psMetrics = conn.prepareStatement(sqlMetric)) {
+        if (metric.getType().equals(TimelineMetric.Type.TIME_SERIES)) {
+          LOG.warn("The incoming timeline metric contains time series data, "
+              + "which is currently not supported by Phoenix storage. "
+              + "Time series will be truncated. ");
+        }
+        int idx = setStringsForPrimaryKey(psMetrics, context, entity, 1);
+        psMetrics.setString(idx++, metric.getId());
+        Iterator<Map.Entry<Long, Number>> currNumIter =
+            metric.getValues().entrySet().iterator();
+        if (currNumIter.hasNext()) {
+          // TODO: support time series storage
+          Map.Entry<Long, Number> currEntry = currNumIter.next();
+          psMetrics.setBytes(idx++,
+              GenericObjectMapper.write(currEntry.getValue()));
+          psMetrics.setLong(idx++, currEntry.getKey());
+        } else {
+          psMetrics.setBytes(idx++, GenericObjectMapper.write(null));
+          LOG.warn("The incoming metric contains an empty value set. ");
+        }
+        psMetrics.execute();
+      } catch (IOException ie) {
+        LOG.error("Exception on converting single data to bytes: "
+            + ie.getMessage());
+        throw new SQLException(ie);
+      }
+    }
+  }
+
+  private static void storeEvents(TimelineEntity entity,
+      TimelineCollectorContext context, Connection conn) throws SQLException {
+    if (entity.getEvents() == null) {
+      return;
+    }
+    Set<TimelineEvent> events = entity.getEvents();
+    for (TimelineEvent event : events) {
+      // We need this number to check if the incoming event's info field is 
empty
+      int numPlaceholders = 0;
+      StringBuilder sqlColumns = new StringBuilder(
+          StringUtils.join(PHOENIX_STORAGE_PK_LIST, ","));
+      sqlColumns.append(",")
+          .append(StringUtils.join(TIMELINE_EVENT_EXTRA_PK_LIST, ","));
+      appendColumnsSQL(sqlColumns, new DynamicColumns<>(
+          EVENT_INFO_COLUMN_FAMILY, DynamicColumns.COLUMN_FAMILY_TYPE_BYTES,
+          event.getInfo().keySet()));
+      numPlaceholders += event.getInfo().keySet().size();
+      if (numPlaceholders == 0) {
+        continue;
+      }
+      StringBuilder placeholders = new StringBuilder();
+      placeholders.append(
+          StringUtils.repeat("?,", PHOENIX_STORAGE_PK_LIST.length))
+          .append(StringUtils.repeat("?,", 
TIMELINE_EVENT_EXTRA_PK_LIST.length));
+      // numPlaceholders >= 1 now
+      placeholders.append("?")
+            .append(StringUtils.repeat(",?", numPlaceholders - 1));
+      String sqlEvents = new StringBuilder("UPSERT INTO ")
+          .append(EVENT_TABLE_NAME).append(" (").append(sqlColumns)
+          .append(") VALUES(").append(placeholders).append(")").toString();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("SQL statement for events: " + sqlEvents);
+      }
+      try (PreparedStatement psEvent = conn.prepareStatement(sqlEvents)) {
+        int idx = setStringsForPrimaryKey(psEvent, context, entity, 1);
+        psEvent.setLong(idx++, event.getTimestamp());
+        psEvent.setString(idx++, event.getId());
+        setBytesForColumnFamily(psEvent, event.getInfo(), idx);
+        psEvent.execute();
+      }
+    }
+  }
+
+  // WARNING: This method will permanently drop a table!
+  @Private
+  @VisibleForTesting
+  void dropTable(String tableName) throws Exception {
+    try (Connection conn = getConnection();
+         Statement stmt = conn.createStatement()) {
+      String sql = "DROP TABLE " + tableName;
+      stmt.executeUpdate(sql);
+    } catch (SQLException se) {
+      LOG.error("Failed in dropping entity table " + se.getLocalizedMessage());
+      throw se;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
new file mode 100644
index 0000000..a55893e
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestPhoenixTimelineWriterImpl.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+public class TestPhoenixTimelineWriterImpl {
+  private PhoenixTimelineWriterImpl writer;
+
+  @Before
+  public void setup() throws Exception {
+    // TODO: launch a miniphoenix cluster, or else we're directly operating on
+    // the active Phoenix cluster
+    YarnConfiguration conf = new YarnConfiguration();
+    writer = createPhoenixWriter(conf);
+  }
+
+  @Ignore
+  @Test
+  public void testPhoenixWriterBasic() throws Exception {
+    // Set up a list of timeline entities and write them back to Phoenix
+    int numEntity = 12;
+    TimelineEntities te =
+        TestTimelineWriterImpl.getStandardTestTimelineEntities(numEntity);
+    writer.write("cluster_1", "user1", "testFlow", "version1", 1l, 
"app_test_1", te);
+    // Verify if we're storing all entities
+    String sql = "SELECT COUNT(entity_id) FROM "
+        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME;
+    verifySQLWithCount(sql, numEntity, "Number of entities should be ");
+    // Check config (half of all entities)
+    sql = "SELECT COUNT(c.config) FROM "
+        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(c.config VARCHAR) ";
+    verifySQLWithCount(sql, (numEntity / 2),
+        "Number of entities with config should be ");
+    // Check info (half of all entities)
+    sql = "SELECT COUNT(i.info1) FROM "
+        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME + "(i.info1 VARBINARY) ";
+    verifySQLWithCount(sql, (numEntity / 2),
+        "Number of entities with info should be ");
+    // Check config and info (a quarter of all entities)
+    sql = "SELECT COUNT(entity_id) FROM "
+        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
+        + "(c.config VARCHAR, i.info1 VARBINARY) "
+        + "WHERE c.config IS NOT NULL AND i.info1 IS NOT NULL";
+    verifySQLWithCount(sql, (numEntity / 4),
+        "Number of entities with both config and info should be ");
+    // Check relatesToEntities and isRelatedToEntities
+    sql = "SELECT COUNT(entity_id) FROM "
+        + PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME
+        + "(rt.testType VARCHAR, ir.testType VARCHAR) "
+        + "WHERE rt.testType IS NOT NULL AND ir.testType IS NOT NULL";
+    verifySQLWithCount(sql, numEntity - 2,
+        "Number of entities with both relatesTo and isRelatedTo should be ");
+    // Check event
+    sql = "SELECT COUNT(entity_id) FROM "
+        + PhoenixTimelineWriterImpl.EVENT_TABLE_NAME;
+    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
+    // Check metrics
+    sql = "SELECT COUNT(entity_id) FROM "
+        + PhoenixTimelineWriterImpl.METRIC_TABLE_NAME;
+    verifySQLWithCount(sql, (numEntity / 4), "Number of events should be ");
+  }
+
+  @After
+  public void cleanup() throws Exception {
+    // Note: it is assumed that we're working on a test only cluster, or else
+    // this cleanup process will drop the entity table.
+    writer.dropTable(PhoenixTimelineWriterImpl.ENTITY_TABLE_NAME);
+    writer.dropTable(PhoenixTimelineWriterImpl.EVENT_TABLE_NAME);
+    writer.dropTable(PhoenixTimelineWriterImpl.METRIC_TABLE_NAME);
+    writer.serviceStop();
+  }
+
+  private static PhoenixTimelineWriterImpl createPhoenixWriter(
+      YarnConfiguration conf) throws Exception{
+    PhoenixTimelineWriterImpl myWriter = new PhoenixTimelineWriterImpl();
+    myWriter.serviceInit(conf);
+    return myWriter;
+  }
+
+  private void verifySQLWithCount(String sql, int targetCount, String message)
+      throws Exception{
+    try (
+        Statement stmt =
+          PhoenixTimelineWriterImpl.getConnection().createStatement();
+        ResultSet rs = stmt.executeQuery(sql)) {
+      assertTrue("Result set empty on statement " + sql, rs.next());
+      assertNotNull("Fail to execute query " + sql, rs);
+      assertEquals(message + " " + targetCount, targetCount, rs.getInt(1));
+    } catch (SQLException se) {
+      fail("SQL exception on query: " + sql
+          + " With exception message: " + se.getLocalizedMessage());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9b794ab1/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
new file mode 100644
index 0000000..7a7afc0
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterImpl.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.timelineservice.storage;
+
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
+import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
+
+public class TestTimelineWriterImpl {
+  static TimelineEntities getStandardTestTimelineEntities(int listSize) {
+    TimelineEntities te = new TimelineEntities();
+    for (int i = 0; i < listSize; i++) {
+      TimelineEntity entity = new TimelineEntity();
+      String id = "hello" + i;
+      String type = "testType";
+      entity.setId(id);
+      entity.setType(type);
+      entity.setCreatedTime(1425016501000L + i);
+      entity.setModifiedTime(1425016502000L + i);
+      if (i > 0) {
+        entity.addRelatesToEntity(type, "hello" + i);
+        entity.addRelatesToEntity(type, "hello" + (i - 1));
+      }
+      if (i < listSize - 1) {
+        entity.addIsRelatedToEntity(type, "hello" + i);
+        entity.addIsRelatedToEntity(type, "hello" + (i + 1));
+      }
+      int category = i % 4;
+      switch (category) {
+      case 0:
+        entity.addConfig("config", "config" + i);
+        // Fall through deliberately
+      case 1:
+        entity.addInfo("info1", new Integer(i));
+        entity.addInfo("info2", "helloworld");
+        // Fall through deliberately
+      case 2:
+        break;
+      case 3:
+        entity.addConfig("config", "config" + i);
+        TimelineEvent event = new TimelineEvent();
+        event.setId("test event");
+        event.setTimestamp(1425016501100L + i);
+        event.addInfo("test_info", "content for " + entity.getId());
+        event.addInfo("test_info1", new Integer(i));
+        entity.addEvent(event);
+        TimelineMetric metric = new TimelineMetric();
+        metric.setId("HDFS_BYTES_READ");
+        metric.addValue(1425016501100L + i, 8000 + i);
+        entity.addMetric(metric);
+        break;
+      }
+      te.addEntity(entity);
+    }
+    return te;
+  }
+}

Reply via email to