This is an automated email from the ASF dual-hosted git repository.

tdsilva pushed a commit to branch 4.14-HBase-1.3
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/4.14-HBase-1.3 by this push:
     new 8512ece  PHOENIX-5316 Use callable instead of runnable so that Pherf 
exceptions cause tests to fail
8512ece is described below

commit 8512ecefd4370dfc4e3fc547de858ac8abe11984
Author: Thomas D'Silva <tdsi...@apache.org>
AuthorDate: Tue Jun 4 16:46:43 2019 -0700

    PHOENIX-5316 Use callable instead of runnable so that Pherf exceptions 
cause tests to fail
---
 .../java/org/apache/phoenix/pherf/PherfMainIT.java | 17 +++++--
 .../main/java/org/apache/phoenix/pherf/Pherf.java  |  7 ++-
 .../pherf/configuration/DataTypeMapping.java       |  4 +-
 .../phoenix/pherf/configuration/Scenario.java      |  2 +
 .../pherf/configuration/XMLConfigParser.java       |  3 +-
 .../apache/phoenix/pherf/jmx/MonitorManager.java   |  9 ++--
 .../apache/phoenix/pherf/rules/RulesApplier.java   | 13 +++--
 .../pherf/workload/MultiThreadedRunner.java        | 27 ++++++-----
 .../pherf/workload/MultithreadedDiffer.java        |  6 ++-
 .../phoenix/pherf/workload/QueryExecutor.java      | 56 ++++++++++------------
 .../apache/phoenix/pherf/workload/Workload.java    |  4 +-
 .../phoenix/pherf/workload/WorkloadExecutor.java   |  5 +-
 .../phoenix/pherf/workload/WriteWorkload.java      | 53 ++++++++++++--------
 .../scenario/prod_test_unsalted_scenario.xml       |  6 +--
 14 files changed, 129 insertions(+), 83 deletions(-)

diff --git 
a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java 
b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
index 2407ef4..6dc900e 100644
--- a/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
+++ b/phoenix-pherf/src/it/java/org/apache/phoenix/pherf/PherfMainIT.java
@@ -22,15 +22,24 @@ import org.junit.Rule;
 import org.junit.Test;
 import org.junit.contrib.java.lang.system.ExpectedSystemExit;
 
+import java.util.concurrent.Future;
+
 public class PherfMainIT extends ResultBaseTestIT {
     @Rule
     public final ExpectedSystemExit exit = ExpectedSystemExit.none();
 
     @Test
-    public void testPherfMain() {
-        String[] args = { "-q",
-                "--scenarioFile", ".*prod_test_unsalted_scenario.*",
+    public void testPherfMain() throws Exception {
+        String[] args = { "-q", "-l",
+                "--schemaFile", ".*create_prod_test_unsalted.sql",
+                "--scenarioFile", ".*prod_test_unsalted_scenario.xml",
                 "-m", "--monitorFrequency", "10" };
-        Pherf.main(args);
+        Pherf pherf = new Pherf(args);
+        pherf.run();
+
+        // verify that none of the scenarios threw any exceptions
+        for (Future<Void> future : pherf.workloadExecutor.jobs.values()) {
+            future.get();
+        }
     }
 }
\ No newline at end of file
diff --git a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
index d92ffde..51d6743 100644
--- a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
+++ b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/Pherf.java
@@ -24,6 +24,8 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 
+import com.google.common.annotations.VisibleForTesting;
+import jline.internal.TestAccessible;
 import org.apache.commons.cli.CommandLine;
 import org.apache.commons.cli.CommandLineParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -115,6 +117,9 @@ public class Pherf {
     private final boolean thinDriver;
     private final String queryServerUrl;
 
+    @VisibleForTesting
+    WorkloadExecutor workloadExecutor;
+
     public Pherf(String[] args) throws Exception {
         CommandLineParser parser = new PosixParser();
         CommandLine command = null;
@@ -201,7 +206,7 @@ public class Pherf {
     public void run() throws Exception {
         MonitorManager monitorManager = null;
         List<Workload> workloads = new ArrayList<>();
-        WorkloadExecutor workloadExecutor = new WorkloadExecutor(properties, 
workloads, !isFunctional);
+        workloadExecutor = new WorkloadExecutor(properties, workloads, 
!isFunctional);
         try {
             if (listFiles) {
                 ResourceList list = new 
ResourceList(PherfConstants.RESOURCE_DATAMODEL);
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
index 0476df2..129bdc2 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/DataTypeMapping.java
@@ -29,7 +29,9 @@ public enum DataTypeMapping {
     UNSIGNED_LONG("UNSIGNED_LONG", Types.LONGVARCHAR),
     VARCHAR_ARRAY("VARCHAR ARRAY", Types.ARRAY),
     VARBINARY("VARBINARY", Types.VARBINARY),
-    TIMESTAMP("TIMESTAMP", Types.TIMESTAMP);
+    TIMESTAMP("TIMESTAMP", Types.TIMESTAMP),
+    BIGINT("BIGINT", Types.BIGINT),
+    TINYINT("TINYINT", Types.TINYINT);
 
     private final String sType;
 
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
index 132207b..db9a714 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/Scenario.java
@@ -27,6 +27,7 @@ import javax.xml.bind.annotation.XmlElement;
 import javax.xml.bind.annotation.XmlElementWrapper;
 import javax.xml.bind.annotation.XmlRootElement;
 
+import com.google.common.base.Preconditions;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
@@ -161,6 +162,7 @@ public class Scenario {
      */
     @XmlAttribute()
     public String getName() {
+        Preconditions.checkNotNull(name);
         return name;
     }
 
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
index a0ee471..8f2a1d8 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/configuration/XMLConfigParser.java
@@ -96,7 +96,8 @@ public class XMLConfigParser {
                     scenarios.add(scenario);
                 }
             } catch (JAXBException e) {
-                e.printStackTrace();
+                logger.error("Unable to parse scenario file "+path, e);
+                throw e;
             }
         }
         return scenarios;
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
index bb29902..5c434d8 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/jmx/MonitorManager.java
@@ -35,6 +35,7 @@ import javax.management.StandardMBean;
 import java.io.IOException;
 import java.lang.management.ManagementFactory;
 import java.util.*;
+import java.util.concurrent.Callable;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
@@ -91,9 +92,9 @@ public class MonitorManager implements Workload {
         this.shouldStop.set(true);
     }
 
-    @Override public Runnable execute() {
-        return new Runnable() {
-            @Override public void run() {
+    @Override public Callable<Void> execute() {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
                 try {
                     while (!shouldStop()) {
                         isRunning.set(true);
@@ -131,6 +132,7 @@ public class MonitorManager implements Workload {
                             } catch (Exception e) {
                                 Thread.currentThread().interrupt();
                                 e.printStackTrace();
+                                throw e;
                             }
                         }
                     }
@@ -144,6 +146,7 @@ public class MonitorManager implements Workload {
                         throw new FileLoaderRuntimeException("Could not close 
monitor results.", e);
                     }
                 }
+                return null;
             }
         };
     }
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
index 2afc29a..6d1e727 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/rules/RulesApplier.java
@@ -35,6 +35,7 @@ import org.slf4j.LoggerFactory;
 
 import java.text.SimpleDateFormat;
 import java.util.*;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicLong;
 
 public class RulesApplier {
@@ -228,24 +229,30 @@ public class RulesApplier {
                     data = new DataValue(column.getType(), 
String.valueOf(dbl));
                 }
                 break;
+            case TINYINT:
             case INTEGER:
                 if ((column.getDataValues() != null) && 
(column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
                 } else {
                     int minInt = (int) column.getMinValue();
                     int maxInt = (int) column.getMaxValue();
-                    Preconditions.checkArgument((minInt > 0) && (maxInt > 0), 
"min and max values need to be set in configuration for integers " + 
column.getName());
-                    int intVal = RandomUtils.nextInt(minInt, maxInt);
+                    if (column.getType() == DataTypeMapping.TINYINT) {
+                        Preconditions.checkArgument((minInt >= -128) && 
(minInt <= 128), "min value need to be set in configuration for tinyints " + 
column.getName());
+                        Preconditions.checkArgument((maxInt >= -128) && 
(maxInt <= 128), "max value need to be set in configuration for tinyints " + 
column.getName());
+                    }
+                    int intVal = ThreadLocalRandom.current().nextInt(minInt, 
maxInt + 1);
                     data = new DataValue(column.getType(), 
String.valueOf(intVal));
                 }
                 break;
+            case BIGINT:
             case UNSIGNED_LONG:
                 if ((column.getDataValues() != null) && 
(column.getDataValues().size() > 0)) {
                     data = pickDataValueFromList(dataValues);
                 } else {
                     long minLong = column.getMinValue();
                     long maxLong = column.getMaxValue();
-                    Preconditions.checkArgument((minLong > 0) && (maxLong > 
0), "min and max values need to be set in configuration for unsigned_longs " + 
column.getName());
+                    if (column.getType() == DataTypeMapping.UNSIGNED_LONG)
+                        Preconditions.checkArgument((minLong > 0) && (maxLong 
> 0), "min and max values need to be set in configuration for unsigned_longs " 
+ column.getName());
                     long longVal = RandomUtils.nextLong(minLong, maxLong);
                     data = new DataValue(column.getType(), 
String.valueOf(longVal));
                 }
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
index 7b9313f..4423bbd 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultiThreadedRunner.java
@@ -23,6 +23,7 @@ import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.Callable;
 
 import org.apache.phoenix.pherf.result.DataModelResult;
 import org.apache.phoenix.pherf.result.ResultManager;
@@ -38,7 +39,7 @@ import org.apache.phoenix.pherf.configuration.WriteParams;
 import org.apache.phoenix.pherf.configuration.XMLConfigParser;
 import org.apache.phoenix.pherf.util.PhoenixUtil;
 
-class MultiThreadedRunner implements Runnable {
+class MultiThreadedRunner implements Callable<Void> {
     private static final Logger logger = 
LoggerFactory.getLogger(MultiThreadedRunner.class);
     private Query query;
     private ThreadTime threadTime;
@@ -85,29 +86,28 @@ class MultiThreadedRunner implements Runnable {
      * Executes run for a minimum of number of execution or execution duration
      */
     @Override
-    public void run() {
+    public Void call() throws Exception {
         logger.info("\n\nThread Starting " + threadName + " ; " + 
query.getStatement() + " for "
                 + numberOfExecutions + "times\n\n");
         Long start = System.currentTimeMillis();
         for (long i = numberOfExecutions; (i > 0 && 
((System.currentTimeMillis() - start)
                 < executionDurationInMs)); i--) {
-            try {
-                synchronized (resultManager) {
-                    timedQuery();
-                    if ((System.currentTimeMillis() - lastResultWritten) > 
1000) {
-                        resultManager.write(dataModelResult, ruleApplier);
-                        lastResultWritten = System.currentTimeMillis();
-                    }
+            synchronized (workloadExecutor) {
+                timedQuery();
+                if ((System.currentTimeMillis() - lastResultWritten) > 1000) {
+                    resultManager.write(dataModelResult, ruleApplier);
+                    lastResultWritten = System.currentTimeMillis();
                 }
-            } catch (Exception e) {
-                e.printStackTrace();
             }
         }
 
         // Make sure all result have been dumped before exiting
-        resultManager.flush();
+        synchronized (workloadExecutor) {
+            resultManager.flush();
+        }
 
         logger.info("\n\nThread exiting." + threadName + "\n\n");
+        return null;
     }
 
     private synchronized ThreadTime getThreadTime() {
@@ -165,8 +165,9 @@ class MultiThreadedRunner implements Runnable {
                 conn.commit();
             }
         } catch (Exception e) {
-            e.printStackTrace();
+            logger.error("Exception while executing query", e);
             exception = e.getMessage();
+            throw e;
         } finally {
             getThreadTime().getRunTimesInMs().add(new RunTime(exception, 
startDate, resultRowCount,
                     (int) (System.currentTimeMillis() - start)));
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
index decff51..6e828bd 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/MultithreadedDiffer.java
@@ -20,6 +20,7 @@ package org.apache.phoenix.pherf.workload;
 
 import java.util.Calendar;
 import java.util.Date;
+import java.util.concurrent.Callable;
 
 import org.apache.phoenix.pherf.PherfConstants;
 import org.apache.phoenix.pherf.configuration.Query;
@@ -29,7 +30,7 @@ import org.apache.phoenix.pherf.util.PhoenixUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-class MultithreadedDiffer implements Runnable {
+class MultithreadedDiffer implements Callable<Void> {
     private static final Logger logger = 
LoggerFactory.getLogger(MultiThreadedRunner.class);
     private Thread t;
     private Query query;
@@ -80,7 +81,7 @@ class MultithreadedDiffer implements Runnable {
     /**
      * Executes verification runs for a minimum of number of execution or 
execution duration
      */
-    public void run() {
+    public Void call() throws Exception {
         logger.info("\n\nThread Starting " + t.getName() + " ; " + 
query.getStatement() + " for "
                 + numberOfExecutions + "times\n\n");
         Long start = System.currentTimeMillis();
@@ -93,5 +94,6 @@ class MultithreadedDiffer implements Runnable {
             }
         }
         logger.info("\n\nThread exiting." + t.getName() + "\n\n");
+        return null;
     }
 }
\ No newline at end of file
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
index 8d0ced5..c4a3517 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/QueryExecutor.java
@@ -31,6 +31,7 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 
@@ -78,16 +79,16 @@ public class QueryExecutor implements Workload {
      * @throws Exception
      */
     @Override
-    public Runnable execute() throws Exception {
-        Runnable runnable = null;
+    public Callable<Void> execute() throws Exception {
+        Callable<Void> callable = null;
         for (DataModel dataModel : dataModels) {
             if (exportCSV) {
-                runnable = exportAllScenarios(dataModel);
+                callable = exportAllScenarios(dataModel);
             } else {
-                runnable = executeAllScenarios(dataModel);
+                callable = executeAllScenarios(dataModel);
             }
         }
-        return runnable;
+        return callable;
     }
 
     /**
@@ -96,12 +97,11 @@ public class QueryExecutor implements Workload {
      * @param dataModel
      * @throws Exception
      */
-    protected Runnable exportAllScenarios(final DataModel dataModel) throws 
Exception {
-        return new Runnable() {
+    protected Callable<Void> exportAllScenarios(final DataModel dataModel) 
throws Exception {
+        return new Callable<Void>() {
             @Override
-            public void run() {
+            public Void call() throws Exception {
                 try {
-
                     List<Scenario> scenarios = dataModel.getScenarios();
                     QueryVerifier exportRunner = new QueryVerifier(false);
                     for (Scenario scenario : scenarios) {
@@ -113,8 +113,10 @@ public class QueryExecutor implements Workload {
                         }
                     }
                 } catch (Exception e) {
-                    logger.warn("", e);
+                    logger.error("Scenario throws exception", e);
+                    throw e;
                 }
+                return null;
             }
         };
     }
@@ -125,9 +127,9 @@ public class QueryExecutor implements Workload {
      * @param dataModel
      * @throws Exception
      */
-    protected Runnable executeAllScenarios(final DataModel dataModel) throws 
Exception {
-        return new Runnable() {
-            @Override public void run() {
+    protected Callable<Void> executeAllScenarios(final DataModel dataModel) 
throws Exception {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
                 List<DataModelResult> dataModelResults = new ArrayList<>();
                 DataModelResult
                         dataModelResult =
@@ -163,8 +165,10 @@ public class QueryExecutor implements Workload {
                     resultManager.write(dataModelResults, ruleApplier);
                     resultManager.flush();
                 } catch (Exception e) {
-                    logger.warn("", e);
+                    logger.error("Scenario throws exception", e);
+                    throw e;
                 }
+                return null;
             }
         };
     }
@@ -179,7 +183,7 @@ public class QueryExecutor implements Workload {
      * @throws InterruptedException
      */
     protected void executeQuerySetSerial(DataModelResult dataModelResult, 
QuerySet querySet,
-            QuerySetResult querySetResult, Scenario scenario) throws 
InterruptedException {
+            QuerySetResult querySetResult, Scenario scenario) throws 
ExecutionException, InterruptedException {
         for (Query query : querySet.getQuery()) {
             QueryResult queryResult = new QueryResult(query);
             querySetResult.getQueryResults().add(queryResult);
@@ -190,7 +194,7 @@ public class QueryExecutor implements Workload {
 
                 for (int i = 0; i < cr; i++) {
 
-                    Runnable
+                    Callable
                             thread =
                             executeRunner((i + 1) + "," + cr, dataModelResult, 
queryResult,
                                     querySetResult, scenario);
@@ -198,11 +202,7 @@ public class QueryExecutor implements Workload {
                 }
 
                 for (Future thread : threads) {
-                    try {
-                        thread.get();
-                    } catch (ExecutionException e) {
-                        logger.error("", e);
-                    }
+                    thread.get();
                 }
             }
         }
@@ -217,7 +217,7 @@ public class QueryExecutor implements Workload {
      * @throws InterruptedException
      */
     protected void executeQuerySetParallel(DataModelResult dataModelResult, 
QuerySet querySet,
-            QuerySetResult querySetResult, Scenario scenario) throws 
InterruptedException {
+            QuerySetResult querySetResult, Scenario scenario) throws 
ExecutionException, InterruptedException {
         for (int cr = querySet.getMinConcurrency(); cr <= 
querySet.getMaxConcurrency(); cr++) {
             List<Future> threads = new ArrayList<>();
             for (int i = 0; i < cr; i++) {
@@ -225,7 +225,7 @@ public class QueryExecutor implements Workload {
                     QueryResult queryResult = new QueryResult(query);
                     querySetResult.getQueryResults().add(queryResult);
 
-                    Runnable
+                    Callable<Void>
                             thread =
                             executeRunner((i + 1) + "," + cr, dataModelResult, 
queryResult,
                                     querySetResult, scenario);
@@ -233,11 +233,7 @@ public class QueryExecutor implements Workload {
                 }
 
                 for (Future thread : threads) {
-                    try {
-                        thread.get();
-                    } catch (ExecutionException e) {
-                        logger.error("", e);
-                    }
+                    thread.get();
                 }
             }
         }
@@ -253,14 +249,14 @@ public class QueryExecutor implements Workload {
      * @param scenario 
      * @return
      */
-    protected Runnable executeRunner(String name, DataModelResult 
dataModelResult,
+    protected Callable<Void> executeRunner(String name, DataModelResult 
dataModelResult,
             QueryResult queryResult, QuerySet querySet, Scenario scenario) {
         ThreadTime threadTime = new ThreadTime();
         queryResult.getThreadTimes().add(threadTime);
         threadTime.setThreadName(name);
         queryResult.setHint(this.queryHint);
         logger.info("\nExecuting query " + queryResult.getStatement());
-        Runnable thread;
+        Callable<Void> thread;
         if (workloadExecutor.isPerformance()) {
             thread =
                     new MultiThreadedRunner(threadTime.getThreadName(), 
queryResult,
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
index 882fa50..0532201 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/Workload.java
@@ -17,8 +17,10 @@
  */
 package org.apache.phoenix.pherf.workload;
 
+import java.util.concurrent.Callable;
+
 public interface Workload {
-    public Runnable execute() throws Exception;
+    public Callable<Void> execute() throws Exception;
 
     /**
      * Use this method to perform any cleanup or forced shutdown of the thread.
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
index 3cde7ae..4abb574 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WorkloadExecutor.java
@@ -18,6 +18,8 @@
 
 package org.apache.phoenix.pherf.workload;
 
+import com.google.common.annotations.VisibleForTesting;
+import jline.internal.TestAccessible;
 import org.apache.phoenix.pherf.PherfConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -34,7 +36,8 @@ public class WorkloadExecutor {
     private final boolean isPerformance;
 
     // Jobs can be accessed by multiple threads
-    private final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
+    @VisibleForTesting
+    public final Map<Workload, Future> jobs = new ConcurrentHashMap<>();
 
     private final ExecutorService pool;
 
diff --git 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
index b340a2b..cae223c 100644
--- 
a/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
+++ 
b/phoenix-pherf/src/main/java/org/apache/phoenix/pherf/workload/WriteWorkload.java
@@ -151,9 +151,9 @@ public class WriteWorkload implements Workload {
         pool.shutdownNow();
     }
 
-    public Runnable execute() throws Exception {
-        return new Runnable() {
-            @Override public void run() {
+    public Callable<Void> execute() throws Exception {
+        return new Callable<Void>() {
+            @Override public Void call() throws Exception {
                 try {
                     DataLoadTimeSummary dataLoadTimeSummary = new 
DataLoadTimeSummary();
                     DataLoadThreadTime dataLoadThreadTime = new 
DataLoadThreadTime();
@@ -169,8 +169,10 @@ public class WriteWorkload implements Workload {
                     resultUtil.write(dataLoadThreadTime);
 
                 } catch (Exception e) {
-                    logger.warn("", e);
+                    logger.error("WriteWorkLoad failed", e);
+                    throw e;
                 }
+                return null;
             }
         };
     }
@@ -292,21 +294,17 @@ public class WriteWorkload implements Workload {
                                     rowsCreated += result;
                                 }
                             }
-                            try {
-                                connection.commit();
-                                duration = System.currentTimeMillis() - last;
-                                logger.info("Writer (" + 
Thread.currentThread().getName()
-                                        + ") committed Batch. Total " + 
getBatchSize()
-                                        + " rows for this thread (" + 
this.hashCode() + ") in ("
-                                        + duration + ") Ms");
-
-                                if (i % PherfConstants.LOG_PER_NROWS == 0 && i 
!= 0) {
-                                    dataLoadThreadTime.add(tableName,
-                                        Thread.currentThread().getName(), i,
-                                        System.currentTimeMillis() - 
logStartTime);
-                                }
-                            } catch (SQLException e) {
-                                logger.warn("SQLException in commit 
operation", e);
+                            connection.commit();
+                            duration = System.currentTimeMillis() - last;
+                            logger.info("Writer (" + 
Thread.currentThread().getName()
+                                    + ") committed Batch. Total " + 
getBatchSize()
+                                    + " rows for this thread (" + 
this.hashCode() + ") in ("
+                                    + duration + ") Ms");
+
+                            if (i % PherfConstants.LOG_PER_NROWS == 0 && i != 
0) {
+                                dataLoadThreadTime.add(tableName,
+                                    Thread.currentThread().getName(), i,
+                                    System.currentTimeMillis() - logStartTime);
                             }
 
                             logStartTime = System.currentTimeMillis();
@@ -317,6 +315,7 @@ public class WriteWorkload implements Workload {
                         }
                     }
                 } catch (SQLException e) {
+                    logger.error("Scenario " + scenario.getName() + " failed 
with exception ", e);
                     throw e;
                 } finally {
                     // Need to keep the statement open to send the remaining 
batch of updates
@@ -396,11 +395,25 @@ public class WriteWorkload implements Workload {
                 break;
             case UNSIGNED_LONG:
                 if (dataValue.getValue().equals("")) {
-                    statement.setNull(count, Types.LONGVARCHAR);
+                    statement.setNull(count, Types.OTHER);
+                } else {
+                    statement.setLong(count, 
Long.parseLong(dataValue.getValue()));
+                }
+                break;
+            case BIGINT:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.BIGINT);
                 } else {
                     statement.setLong(count, 
Long.parseLong(dataValue.getValue()));
                 }
                 break;
+            case TINYINT:
+                if (dataValue.getValue().equals("")) {
+                    statement.setNull(count, Types.TINYINT);
+                } else {
+                    statement.setLong(count, 
Integer.parseInt(dataValue.getValue()));
+                }
+                break;
             case DATE:
                 if (dataValue.getValue().equals("")) {
                     statement.setNull(count, Types.DATE);
diff --git 
a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml 
b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
index 1c32b75..fb89ef3 100644
--- a/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
+++ b/phoenix-pherf/src/main/resources/scenario/prod_test_unsalted_scenario.xml
@@ -316,12 +316,12 @@
             </dataOverride>
             
            <preScenarioDdls>
-                <ddl>CREATE INDEX IDX_DIVISION ON 
PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)</ddl>
+                <ddl statement="CREATE INDEX IDX_DIVISION ON 
PHERF.PHERF_PROD_TEST_UNSALTED (DIVISION)"/>
             </preScenarioDdls>
 
            <postScenarioDdls>
-                <ddl>CREATE INDEX IDX_OLDVAL_STRING ON 
PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)</ddl>
-                <ddl>CREATE INDEX IDX_CONNECTION_ID ON 
PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)</ddl>
+                <ddl statement="CREATE INDEX IDX_OLDVAL_STRING ON 
PHERF.PHERF_PROD_TEST_UNSALTED (OLDVAL_STRING)"/>
+                <ddl statement="CREATE INDEX IDX_CONNECTION_ID ON 
PHERF.PHERF_PROD_TEST_UNSALTED (CONNECTION_ID)"/>
             </postScenarioDdls>
             
             <writeParams executionDurationInMs="10000">

Reply via email to