http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java deleted file mode 100644 index 45073d9..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMExperimentSetRunner.java +++ /dev/null @@ -1,306 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.logging.Level; -import java.util.logging.Logger; -import java.util.regex.Pattern; - -import org.apache.asterix.experiment.action.base.SequentialActionList; -import org.apache.asterix.experiment.builder.AbstractExperimentBuilder; -import org.apache.asterix.experiment.builder.PerfTestAggBuilder; -import org.apache.asterix.experiment.builder.PresetClusterPerfBuilder; -import org.kohsuke.args4j.CmdLineException; -import org.kohsuke.args4j.CmdLineParser; -import org.kohsuke.args4j.Option; - -public class LSMExperimentSetRunner { - - private static final Logger LOGGER = Logger.getLogger(LSMExperimentSetRunner.class.getName()); - - public static class LSMExperimentSetRunnerConfig { - - private final String logDirSuffix; - - private final int nQueryRuns; - - public LSMExperimentSetRunnerConfig(String logDirSuffix, int nQueryRuns) { - this.logDirSuffix = logDirSuffix; - this.nQueryRuns = nQueryRuns; - } - - public String getLogDirSuffix() { - return logDirSuffix; - } - - public int getNQueryRuns() { - return nQueryRuns; - } - - @Option(name = "-rh", aliases = "--rest-host", usage = "Asterix REST API host address", required = true, metaVar = "HOST") - private String restHost; - - public String getRESTHost() { - return restHost; - } - - @Option(name = "-rp", aliases = "--rest-port", usage = "Asterix REST API port", required = true, metaVar = "PORT") - private int restPort; - - public int getRESTPort() { - return restPort; - } - - @Option(name = "-mh", aliases = "--managix-home", usage = "Path to MANAGIX_HOME directory", required = true, metaVar = "MGXHOME") - private String managixHome; - - public String getManagixHome() { - return managixHome; - } - - @Option(name = "-jh", aliases = "--java-home", usage = "Path to JAVA_HOME directory", required = true, metaVar = "JAVAHOME") - private String javaHome; - - public String getJavaHome() { - return javaHome; - } - - @Option(name = "-ler", aliases = "--local-experiment-root", usage = "Path to the local LSM experiment root directory", required = true, metaVar = "LOCALEXPROOT") - private String localExperimentRoot; - - public String getLocalExperimentRoot() { - return localExperimentRoot; - } - - @Option(name = "-u", aliases = "--username", usage = "Username to use for SSH/SCP", required = true, metaVar = "UNAME") - private String username; - - public String getUsername() { - return username; - } - - @Option(name = "-k", aliases = "--key", usage = "SSH key location", metaVar = "SSHKEY") - private String sshKeyLocation; - - public String getSSHKeyLocation() { - return sshKeyLocation; - } - - @Option(name = "-d", aliases = "--datagen-duration", usage = "Data generation duration in seconds", metaVar = "DATAGENDURATION") - private int duration; - - public int getDuration() { - return duration; - } - - @Option(name = "-qd", aliases = "--querygen-duration", usage = "Query generation duration in seconds", metaVar = "QUERYGENDURATION") - private int queryDuration; - - public int getQueryDuration() { - return queryDuration; - } - - @Option(name = "-regex", aliases = "--regex", usage = "Regular expression used to match experiment names", metaVar = "REGEXP") - private String regex; - - public String getRegex() { - return regex; - } - - @Option(name = "-oh", aliases = "--orchestrator-host", usage = "The host address of THIS orchestrator") - private String orchHost; - - public String getOrchestratorHost() { - return orchHost; - } - - @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port to be used for the orchestrator server of THIS orchestrator") - private int orchPort; - - public int getOrchestratorPort() { - return orchPort; - } - - @Option(name = "-qoh", aliases = "--query-orchestrator-host", usage = "The host address of query orchestrator") - private String queryOrchHost; - - public String getQueryOrchestratorHost() { - return queryOrchHost; - } - - @Option(name = "-qop", aliases = "--query-orchestrator-port", usage = "The port to be used for the orchestrator server of query orchestrator") - private int queryOrchPort; - - public int getQueryOrchestratorPort() { - return queryOrchPort; - } - - @Option(name = "-di", aliases = "--data-interval", usage = " Initial data interval to use when generating data for exp 7") - private long dataInterval; - - public long getDataInterval() { - return dataInterval; - } - - @Option(name = "-ni", aliases = "--num-data-intervals", usage = "Number of data intervals to use when generating data for exp 7") - private int numIntervals; - - public int getNIntervals() { - return numIntervals; - } - - @Option(name = "-sf", aliases = "--stat-file", usage = "Enable IO/CPU stats and place in specified file") - private String statFile = null; - - public String getStatFile() { - return statFile; - } - - @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path") - private String openStreetMapFilePath; - - public String getOpenStreetMapFilePath() { - return openStreetMapFilePath; - } - - @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data") - private int locationSampleInterval; - - public int getLocationSampleInterval() { - return locationSampleInterval; - } - - @Option(name = "-qsf", aliases = "--query-seed-filepath", usage = "The query seed file path") - private String querySeedFilePath; - - public String getQuerySeedFilePath() { - return querySeedFilePath; - } - - @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only") - private int recordCountPerBatchDuringIngestionOnly = 1000; - - public int getRecordCountPerBatchDuringIngestionOnly() { - return recordCountPerBatchDuringIngestionOnly; - } - - @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query") - private int recordCountPerBatchDuringQuery = 1000; - - public int getRecordCountPerBatchDuringQuery() { - return recordCountPerBatchDuringQuery; - } - - @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent") - private long dataGenSleepTimeDuringIngestionOnly = 1; - - public long getDataGenSleepTimeDuringIngestionOnly() { - return dataGenSleepTimeDuringIngestionOnly; - } - - @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent") - private long dataGenSleepTimeDuringQuery = 1; - - public long getDataGenSleepTimeDuringQuery() { - return dataGenSleepTimeDuringQuery; - } - } - - public static void main(String[] args) throws Exception { - // LogManager.getRootLogger().setLevel(org.apache.log4j.Level.OFF); - LSMExperimentSetRunnerConfig config = new LSMExperimentSetRunnerConfig(String.valueOf(System - .currentTimeMillis()), 3); - CmdLineParser clp = new CmdLineParser(config); - try { - clp.parseArgument(args); - } catch (CmdLineException e) { - System.err.println(e.getMessage()); - clp.printUsage(System.err); - System.exit(1); - } - - Collection<AbstractExperimentBuilder> suite = new ArrayList<>(); - - /* - suite.add(new Experiment7BBuilder(config)); - suite.add(new Experiment7DBuilder(config)); - suite.add(new Experiment7ABuilder(config)); - suite.add(new Experiment8DBuilder(config)); - suite.add(new Experiment8ABuilder(config)); - suite.add(new Experiment8BBuilder(config)); - suite.add(new Experiment9ABuilder(config)); - suite.add(new Experiment9DBuilder(config)); - suite.add(new Experiment9BBuilder(config)); - suite.add(new Experiment6ABuilder(config)); - suite.add(new Experiment6BBuilder(config)); - suite.add(new Experiment6CBuilder(config)); - suite.add(new Experiment2D1Builder(config)); - suite.add(new Experiment2D2Builder(config)); - suite.add(new Experiment2D4Builder(config)); - suite.add(new Experiment2D8Builder(config)); - suite.add(new Experiment2C1Builder(config)); - suite.add(new Experiment2C2Builder(config)); - suite.add(new Experiment2C4Builder(config)); - suite.add(new Experiment2C8Builder(config)); - suite.add(new Experiment2A1Builder(config)); - suite.add(new Experiment2A2Builder(config)); - suite.add(new Experiment2A4Builder(config)); - suite.add(new Experiment2A8Builder(config)); - suite.add(new Experiment2B1Builder(config)); - suite.add(new Experiment2B2Builder(config)); - suite.add(new Experiment2B4Builder(config)); - suite.add(new Experiment2B8Builder(config)); - suite.add(new Experiment1ABuilder(config)); - suite.add(new Experiment1BBuilder(config)); - suite.add(new Experiment1CBuilder(config)); - suite.add(new Experiment1DBuilder(config)); - suite.add(new Experiment1EBuilder(config)); - suite.add(new Experiment4ABuilder(config)); - suite.add(new Experiment4BBuilder(config)); - suite.add(new Experiment4CBuilder(config)); - suite.add(new Experiment4DBuilder(config)); - suite.add(new Experiment3ABuilder(config)); - suite.add(new Experiment3BBuilder(config)); - suite.add(new Experiment3CBuilder(config)); - suite.add(new Experiment3DBuilder(config)); - suite.add(new Experiment5ABuilder(config)); - suite.add(new Experiment5BBuilder(config)); - suite.add(new Experiment5CBuilder(config)); - suite.add(new Experiment5DBuilder(config)); - */ - suite.add(new PerfTestAggBuilder(config)); - suite.add(new PresetClusterPerfBuilder(config)); - - Pattern p = config.getRegex() == null ? null : Pattern.compile(config.getRegex()); - - SequentialActionList exps = new SequentialActionList(); - for (AbstractExperimentBuilder eb : suite) { - if (p == null || p.matcher(eb.getName()).matches()) { - exps.add(eb.build()); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Added " + eb.getName() + " to run list..."); - } - } - } - exps.perform(); - } -}
http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java deleted file mode 100644 index 78483d1..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/LSMPerfConstants.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -public class LSMPerfConstants { - - private LSMPerfConstants(){ - throw new UnsupportedOperationException(); - } // never needs to be instantiated - - public static final String CONFIG_DIR = "configs"; - - public static final String AQL_DIR = "aql"; - - public static final String BASE_DIR = "base"; - - public static final String DGEN_DIR = "dgen"; - - public static final String LOG_DIR = "log"; - - public static final String BASE_TYPES = "base/perf_types.aql"; - - public static final String RESULT_FILE = "agg_results.csv"; - - public static final String ASTERIX_CONFIGURATION = "asterix-configuration.xml"; -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java deleted file mode 100644 index a29a74c..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorDGProtocol.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -public enum OrchestratorDGProtocol { - STOPPED, - RESUME, - REACHED -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java deleted file mode 100644 index a69b0ce..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer.java +++ /dev/null @@ -1,158 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.experiment.action.base.IAction; - -public class OrchestratorServer { - - private static final Logger LOGGER = Logger.getLogger(OrchestratorServer.class.getName()); - - private final int port; - - private final int nDataGens; - - private final int nIntervals; - - private final AtomicBoolean running; - - private final IAction[] protocolActions; - - private final boolean flagStopResume; - - public OrchestratorServer(int port, int nDataGens, int nIntervals, IAction[] protocolActions) { - this.port = port; - this.nDataGens = nDataGens; - this.nIntervals = nIntervals; - running = new AtomicBoolean(); - this.protocolActions = protocolActions; - this.flagStopResume = true; - } - - public synchronized void start() throws IOException, InterruptedException { - final AtomicBoolean bound = new AtomicBoolean(); - running.set(true); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - ServerSocket ss = new ServerSocket(port); - synchronized (bound) { - bound.set(true); - bound.notifyAll(); - } - Socket[] conn = new Socket[nDataGens]; - try { - for (int i = 0; i < nDataGens; i++) { - conn[i] = ss.accept(); - } - for (int n = 0; n < nIntervals; ++n) { - //TODO refactor operations according to the protocol message - if (flagStopResume) { - for (int i = 0; i < nDataGens; i++) { - receiveStopped(conn[i]); - } - protocolActions[n].perform(); - if (n != nIntervals - 1) { - for (int i = 0; i < nDataGens; i++) { - sendResume(conn[i]); - } - } - } else { - for (int i = 0; i < nDataGens; i++) { - receiveReached(conn[i]); - } - protocolActions[n].perform(); - } - } - } finally { - for (int i = 0; i < conn.length; ++i) { - if (conn[i] != null) { - conn[i].close(); - } - } - ss.close(); - } - running.set(false); - synchronized (OrchestratorServer.this) { - OrchestratorServer.this.notifyAll(); - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - - }); - t.start(); - synchronized (bound) { - while (!bound.get()) { - bound.wait(); - } - } - } - - private void sendResume(Socket conn) throws IOException { - new DataOutputStream(conn.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal()); - conn.getOutputStream().flush(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + conn.getRemoteSocketAddress()); - } - } - - private void receiveStopped(Socket conn) throws IOException { - int msg = new DataInputStream(conn.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.STOPPED) { - throw new IllegalStateException("Encounted unknown message type " + msgType); - } - } - - private void receiveReached(Socket conn) throws IOException { - int msg = new DataInputStream(conn.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.REACHED) { - throw new IllegalStateException("Encounted unknown message type " + msgType); - } - - } - - public synchronized void awaitFinished() throws InterruptedException { - while (running.get()) { - wait(); - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java deleted file mode 100644 index c547393..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer7.java +++ /dev/null @@ -1,238 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.DataInputStream; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.experiment.action.base.IAction; -import org.apache.asterix.experiment.builder.AbstractExperiment7Builder; - -public class OrchestratorServer7 { - - private static final Logger LOGGER = Logger.getLogger(OrchestratorServer7.class.getName()); - - private final int port; - - private final int nDataGens; - - private final int nIntervals; - - private final AtomicBoolean running; - - private final IProtocolActionBuilder protoActionBuilder; - - private final IAction lsAction; - - private static final int QUERY_TOTAL_COUNT = 2000; - - public OrchestratorServer7(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder, IAction lsAction) { - this.port = port; - this.nDataGens = nDataGens; - this.nIntervals = nIntervals; - running = new AtomicBoolean(); - this.protoActionBuilder = protoActionBuilder; - this.lsAction = lsAction; - } - - public synchronized void start() throws IOException, InterruptedException { - final AtomicBoolean bound = new AtomicBoolean(); - running.set(true); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - ServerSocket ss = new ServerSocket(port); - synchronized (bound) { - bound.set(true); - bound.notifyAll(); - } - Socket[] conn = new Socket[nDataGens]; - try { - for (int i = 0; i < nDataGens; i++) { - conn[i] = ss.accept(); - } - AtomicInteger round = new AtomicInteger(); - AtomicBoolean done = new AtomicBoolean(false); - Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done)); - pct.start(); - int[] queryType = new int[] { 10, 100, 1000, 10000 }; - int type = 0; - //step1. send query when it reaches the query begin round - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step1 starts"); - } - boolean sendQuery = false; - while (!done.get()) { - if (!sendQuery) { - synchronized (round) { - while (true) { - if (round.get() >= AbstractExperiment7Builder.QUERY_BEGIN_ROUND) { - sendQuery = true; - break; - } - round.wait(); - } - } - } - if (sendQuery) { - protoActionBuilder.buildQueryAction(queryType[type % 4], false).perform(); - type = (++type) % 4; - } - - } - pct.join(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step1 ends"); - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step2 starts"); - } - //step2. send one more round of queries after ingestion is over - protoActionBuilder.buildIOWaitAction().perform(); - lsAction.perform(); - for (int i = 0; i < QUERY_TOTAL_COUNT; i++) { - protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform(); - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step2 ends"); - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step3 starts"); - } - //step3. compact dataset - protoActionBuilder.buildCompactAction().perform(); - protoActionBuilder.buildIOWaitAction().perform(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step3 ends"); - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step4 starts"); - } - //step4. send last round of queries after the compaction is over - for (int i = 0; i < QUERY_TOTAL_COUNT; i++) { - protoActionBuilder.buildQueryAction(queryType[i % 4], true).perform(); - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Step4 ends"); - } - - } finally { - for (int i = 0; i < conn.length; ++i) { - if (conn[i] != null) { - conn[i].close(); - } - } - ss.close(); - } - running.set(false); - synchronized (OrchestratorServer7.this) { - OrchestratorServer7.this.notifyAll(); - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - - }); - t.start(); - synchronized (bound) { - while (!bound.get()) { - bound.wait(); - } - } - } - - private static class ProtocolConsumer implements Runnable { - - private final Socket[] conn; - - private final int nIntervals; - - private final AtomicInteger interval; - - private final AtomicBoolean done; - - public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) { - this.conn = conn; - this.nIntervals = nIntervals; - this.interval = interval; - this.done = done; - } - - @Override - public void run() { - interval.set(0); - try { - for (int n = 0; n < nIntervals; ++n) { - for (int i = 0; i < conn.length; i++) { - receiveReached(conn[i]); - } - synchronized (interval) { - interval.getAndIncrement(); - interval.notifyAll(); - } - } - done.set(true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - } - - private static void receiveReached(Socket conn) throws IOException { - int msg = new DataInputStream(conn.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.REACHED) { - throw new IllegalStateException("Encounted unknown message type " + msgType); - } - - } - - public synchronized void awaitFinished() throws InterruptedException { - while (running.get()) { - wait(); - } - } - - public interface IProtocolActionBuilder { - public IAction buildQueryAction(long cardinality, boolean finalRound) throws Exception; - - public IAction buildIOWaitAction() throws Exception; - - public IAction buildCompactAction() throws Exception; - - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java deleted file mode 100644 index be50e7c..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/OrchestratorServer9.java +++ /dev/null @@ -1,164 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.DataInputStream; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.experiment.action.base.IAction; - -public class OrchestratorServer9 { - - private static final Logger LOGGER = Logger.getLogger(OrchestratorServer9.class.getName()); - - private final int port; - - private final int nDataGens; - - private final int nIntervals; - - private final AtomicBoolean running; - - private final IProtocolActionBuilder protoActionBuilder; - - public OrchestratorServer9(int port, int nDataGens, int nIntervals, IProtocolActionBuilder protoActionBuilder) { - this.port = port; - this.nDataGens = nDataGens; - this.nIntervals = nIntervals; - running = new AtomicBoolean(); - this.protoActionBuilder = protoActionBuilder; - } - - public synchronized void start() throws IOException, InterruptedException { - final AtomicBoolean bound = new AtomicBoolean(); - running.set(true); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - ServerSocket ss = new ServerSocket(port); - synchronized (bound) { - bound.set(true); - bound.notifyAll(); - } - Socket[] conn = new Socket[nDataGens]; - try { - for (int i = 0; i < nDataGens; i++) { - conn[i] = ss.accept(); - } - AtomicInteger round = new AtomicInteger(); - AtomicBoolean done = new AtomicBoolean(false); - Thread pct = new Thread(new ProtocolConsumer(conn, nIntervals, round, done)); - pct.start(); - while (!done.get()) { - protoActionBuilder.buildAction(round.get()).perform(); - } - pct.join(); - } finally { - for (int i = 0; i < conn.length; ++i) { - if (conn[i] != null) { - conn[i].close(); - } - } - ss.close(); - } - running.set(false); - synchronized (OrchestratorServer9.this) { - OrchestratorServer9.this.notifyAll(); - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - - }); - t.start(); - synchronized (bound) { - while (!bound.get()) { - bound.wait(); - } - } - } - - private static class ProtocolConsumer implements Runnable { - - private final Socket[] conn; - - private final int nIntervals; - - private final AtomicInteger interval; - - private final AtomicBoolean done; - - public ProtocolConsumer(Socket[] conn, int nIntervals, AtomicInteger interval, AtomicBoolean done) { - this.conn = conn; - this.nIntervals = nIntervals; - this.interval = interval; - this.done = done; - } - - @Override - public void run() { - interval.set(0); - try { - for (int n = 0; n < nIntervals; ++n) { - for (int i = 0; i < conn.length; i++) { - receiveReached(conn[i]); - } - interval.getAndIncrement(); - } - done.set(true); - } catch (Exception e) { - e.printStackTrace(); - } - } - - } - - private static void receiveReached(Socket conn) throws IOException { - int msg = new DataInputStream(conn.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.REACHED) { - throw new IllegalStateException("Encounted unknown message type " + msgType); - } - - } - - public synchronized void awaitFinished() throws InterruptedException { - while (running.get()) { - wait(); - } - } - - public interface IProtocolActionBuilder { - public IAction buildAction(int round) throws Exception; - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java deleted file mode 100644 index 22e5ac0..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/RecordCountingServer.java +++ /dev/null @@ -1,166 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.Reader; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; - -public class RecordCountingServer { - - private final ExecutorService threadPool; - - private final int port; - - private final long duration; - - private ServerSocket ss; - - private boolean stopped; - - private final Object o = new Object(); - - final AtomicBoolean b = new AtomicBoolean(false); - - public RecordCountingServer(int port, long duration) { - this.port = port; - this.duration = duration; - threadPool = Executors.newCachedThreadPool(); - } - - public void start() throws IOException, InterruptedException { - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - stopped = false; - ss = new ServerSocket(port); - while (true) { - Socket s = ss.accept(); - if (stopped) { - break; - } - threadPool.execute(new RecordCountingThread(s, duration)); - synchronized (o) { - b.set(true); - o.notifyAll(); - } - } - } catch (Exception e) { - e.printStackTrace(); - } - } - }); - t.start(); - } - - public void awaitFirstConnection() throws InterruptedException { - synchronized (o) { - if (!b.get()) { - o.wait(); - } - } - } - - public void stop() throws IOException, InterruptedException { - stopped = true; - threadPool.shutdown(); - threadPool.awaitTermination(1000, TimeUnit.DAYS); - ss.close(); - } - - private static class RecordCountingThread implements Runnable { - private final Socket s; - - private final long duration; - - private final char[] buf; - - private int index; - - private int count; - - public RecordCountingThread(Socket s, long duration) { - this.s = s; - this.duration = duration; - buf = new char[32 * 1024]; - } - - @Override - public void run() { - count = 0; - index = 0; - long start = System.currentTimeMillis(); - try { - InputStreamReader r = new InputStreamReader(s.getInputStream()); - while (System.currentTimeMillis() - start < duration) { - fill(r); - countRecords(); - } - } catch (IOException e) { - e.printStackTrace(); - } - long end = System.currentTimeMillis(); - System.out.println("Read " + count + " records in " + (end - start) / 1000 + " seconds"); - } - - private void countRecords() { - for (int i = 0; i < index; ++i) { - if (buf[i] == '\n') { - ++count; - } - } - } - - private void fill(Reader r) throws IOException { - index = 0; - int read = r.read(buf); - if (read == -1) { - index = 0; - return; - } - index += read; - } - } - - public static void main(String[] args) throws Exception { - long duration = Long.parseLong(args[0]); - int port1 = Integer.parseInt(args[1]); - int port2 = Integer.parseInt(args[2]); - RecordCountingServer rcs1 = new RecordCountingServer(port1, duration * 1000); - RecordCountingServer rcs2 = new RecordCountingServer(port2, duration * 1000); - try { - rcs1.start(); - rcs2.start(); - rcs1.awaitFirstConnection(); - rcs2.awaitFirstConnection(); - } finally { - rcs1.stop(); - rcs2.stop(); - } - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java deleted file mode 100644 index eeac0b4..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketDataGeneratorExecutable.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.net.Socket; -import java.util.Collections; - -import org.apache.asterix.experiment.action.base.AbstractAction; -import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation; - -public class SocketDataGeneratorExecutable extends AbstractAction { - - private final String adapterHost; - - private final int adapterPort; - - public SocketDataGeneratorExecutable(String adapterHost, int adapterPort) { - this.adapterHost = adapterHost; - this.adapterPort = adapterPort; - } - - @Override - protected void doPerform() throws Exception { - Thread.sleep(4000); - Socket s = new Socket(adapterHost, adapterPort); - try { - TweetGeneratorForSpatialIndexEvaluation tg = new TweetGeneratorForSpatialIndexEvaluation(Collections.<String, String> emptyMap(), 0, - TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream()); - long start = System.currentTimeMillis(); - while (tg.setNextRecordBatch(1000)) { - } - long end = System.currentTimeMillis(); - long total = end - start; - System.out.println("Generation finished: " + tg.getNumFlushedTweets() + " in " + total / 1000 + " seconds"); - } finally { - s.close(); - } - } - -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java deleted file mode 100644 index f817fc9..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGenerator.java +++ /dev/null @@ -1,389 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.OutputStream; -import java.net.InetAddress; -import java.net.Socket; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.asterix.tools.external.data.TweetGeneratorForSpatialIndexEvaluation; -import org.apache.commons.lang3.tuple.Pair; - -public class SocketTweetGenerator { - - private final ExecutorService threadPool; - - private final int partitionRangeStart; - - private final int dataGenDuration; - - private final int queryGenDuration; - - private final long startDataInterval; - - private final int nDataIntervals; - - private final String orchHost; - - private final int orchPort; - - private final List<Pair<String, Integer>> receiverAddresses; - - private final String openStreetMapFilePath; - private final int locationSampleInterval; - private final int recordCountPerBatchDuringIngestionOnly; - private final int recordCountPerBatchDuringQuery; - private final long dataGenSleepTimeDuringIngestionOnly; - private final long dataGenSleepTimeDuringQuery; - - private final Mode mode; - - private enum Mode { - TIME, - DATA - } - - public SocketTweetGenerator(SocketTweetGeneratorConfig config) { - threadPool = Executors.newCachedThreadPool(new ThreadFactory() { - - private final AtomicInteger count = new AtomicInteger(); - - @Override - public Thread newThread(Runnable r) { - int tid = count.getAndIncrement(); - Thread t = new Thread(r, "DataGeneratorThread: " + tid); - t.setDaemon(true); - return t; - } - }); - - partitionRangeStart = config.getPartitionRangeStart(); - dataGenDuration = config.getDataGenDuration(); - queryGenDuration = config.getQueryGenDuration(); - startDataInterval = config.getDataInterval(); - nDataIntervals = config.getNIntervals(); - orchHost = config.getOrchestratorHost(); - orchPort = config.getOrchestratorPort(); - receiverAddresses = config.getAddresses(); - mode = startDataInterval > 0 ? Mode.DATA : Mode.TIME; - openStreetMapFilePath = config.getOpenStreetMapFilePath(); - locationSampleInterval = config.getLocationSampleInterval(); - recordCountPerBatchDuringIngestionOnly = config.getRecordCountPerBatchDuringIngestionOnly(); - recordCountPerBatchDuringQuery = config.getRecordCountPerBatchDuringQuery(); - dataGenSleepTimeDuringIngestionOnly = config.getDataGenSleepTimeDuringIngestionOnly(); - dataGenSleepTimeDuringQuery = config.getDataGenSleepTimeDuringQuery(); - } - - public void start() throws Exception { - final Semaphore sem = new Semaphore((receiverAddresses.size() - 1) * -1); - int i = 0; - for (Pair<String, Integer> address : receiverAddresses) { - threadPool.submit(new DataGenerator(mode, sem, address.getLeft(), address.getRight(), i - + partitionRangeStart, dataGenDuration, queryGenDuration, nDataIntervals, startDataInterval, - orchHost, orchPort, openStreetMapFilePath, locationSampleInterval, - recordCountPerBatchDuringIngestionOnly, recordCountPerBatchDuringQuery, - dataGenSleepTimeDuringIngestionOnly, dataGenSleepTimeDuringQuery)); - ++i; - } - sem.acquire(); - } - - public static class DataGenerator implements Runnable { - - private static final Logger LOGGER = Logger.getLogger(DataGenerator.class.getName()); - - private final Mode m; - private final Semaphore sem; - private final String host; - private final int port; - private final int partition; - private final int dataGenDuration; - private final int queryGenDuration; - private final int nDataIntervals; - private final String orchHost; - private final int orchPort; - - private int currentInterval; - private long nextStopInterval; - private final long dataSizeInterval; - private final boolean flagStopResume; - private final String openStreetMapFilePath; - private final int locationSampleInterval; - private final int recordCountPerBatchDuringIngestionOnly; - private final int recordCountPerBatchDuringQuery; - private final long dataGenSleepTimeDuringIngestionOnly; - private final long dataGenSleepTimeDuringQuery; - - public DataGenerator(Mode m, Semaphore sem, String host, int port, int partition, int dataGenDuration, - int queryGenDuration, int nDataIntervals, long dataSizeInterval, String orchHost, int orchPort, - String openStreetMapFilePath, int locationSampleInterval, int recordCountPerBatchDuringIngestionOnly, - int recordCountPerBatchDuringQuery, long dataGenSleepTimeDuringIngestionOnly, - long dataGenSleepTimeDuringQuery) { - this.m = m; - this.sem = sem; - this.host = host; - this.port = port; - this.partition = partition; - this.dataGenDuration = dataGenDuration; - this.queryGenDuration = queryGenDuration; - this.nDataIntervals = nDataIntervals; - currentInterval = 0; - this.dataSizeInterval = dataSizeInterval; - this.nextStopInterval = dataSizeInterval; - this.orchHost = orchHost; - this.orchPort = orchPort; - this.flagStopResume = false; - this.openStreetMapFilePath = openStreetMapFilePath; - //simple heuristic to generate different data from different data generator. - int lsi = locationSampleInterval + (partition + 1) * (partition <= 4 ? 7 : 9); - this.locationSampleInterval = lsi; - this.recordCountPerBatchDuringIngestionOnly = recordCountPerBatchDuringIngestionOnly; - this.recordCountPerBatchDuringQuery = recordCountPerBatchDuringQuery; - this.dataGenSleepTimeDuringIngestionOnly = dataGenSleepTimeDuringIngestionOnly; - this.dataGenSleepTimeDuringQuery = dataGenSleepTimeDuringQuery; - } - - @Override - public void run() { - LOGGER.info("\nDataGen[" + partition + "] running with the following parameters: \n" + "dataGenDuration : " - + dataGenDuration + "\n" + "queryGenDuration : " + queryGenDuration + "\n" + "nDataIntervals : " - + nDataIntervals + "\n" + "dataSizeInterval : " + dataSizeInterval + "\n" - + "recordCountPerBatchDuringIngestionOnly : " + recordCountPerBatchDuringIngestionOnly + "\n" - + "recordCountPerBatchDuringQuery : " + recordCountPerBatchDuringQuery + "\n" - + "dataGenSleepTimeDuringIngestionOnly : " + dataGenSleepTimeDuringIngestionOnly + "\n" - + "dataGenSleepTimeDuringQuery : " + dataGenSleepTimeDuringQuery + "\n" - + "locationSampleInterval : " + locationSampleInterval); - - try { - Socket s = new Socket(host, port); - try { - Socket orchSocket = null; - if (m == Mode.DATA && orchHost != null) { - orchSocket = new Socket(orchHost, orchPort); - } - TweetGeneratorForSpatialIndexEvaluation tg = null; - try { - Map<String, String> config = new HashMap<>(); - String durationVal = m == Mode.TIME ? String.valueOf(dataGenDuration) : "0"; - config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_DURATION, String.valueOf(durationVal)); - if (openStreetMapFilePath != null) { - config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_OPENSTREETMAP_FILEPATH, - openStreetMapFilePath); - config.put(TweetGeneratorForSpatialIndexEvaluation.KEY_LOCATION_SAMPLE_INTERVAL, - String.valueOf(locationSampleInterval)); - } - tg = new TweetGeneratorForSpatialIndexEvaluation(config, partition, - TweetGeneratorForSpatialIndexEvaluation.OUTPUT_FORMAT_ADM_STRING, s.getOutputStream()); - long startTS = System.currentTimeMillis(); - long prevTS = startTS; - long curTS = startTS; - int round = 0; - while (tg.setNextRecordBatch(recordCountPerBatchDuringIngestionOnly)) { - if (m == Mode.DATA) { - if (tg.getNumFlushedTweets() >= nextStopInterval) { - //TODO stop/resume option - if (orchSocket != null) { - if (flagStopResume) { - // send stop to orchestrator - sendStopped(orchSocket); - } else { - sendReached(orchSocket); - } - } - - // update intervals - // TODO give options: exponential/linear interval - nextStopInterval += dataSizeInterval; - if (++currentInterval >= nDataIntervals) { - break; - } - - if (orchSocket != null) { - if (flagStopResume) { - receiveResume(orchSocket); - } - } - } - } - curTS = System.currentTimeMillis(); - if (LOGGER.isLoggable(Level.INFO)) { - round++; - if ((round * recordCountPerBatchDuringIngestionOnly) % 100000 == 0) { - System.out.println("DataGen[" + partition - + "][During ingestion only][TimeToInsert100000] " + (curTS - prevTS) - + " in milliseconds"); - round = 0; - prevTS = curTS; - } - } - //to prevent congestion in feed pipe line. - if (dataGenSleepTimeDuringIngestionOnly > 0) { - Thread.sleep(dataGenSleepTimeDuringIngestionOnly); - } - } - - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("DataGen[" + partition - + "][During ingestion only][InsertCount] Num tweets flushed = " - + tg.getNumFlushedTweets() + " in " - + ((System.currentTimeMillis() - startTS) / 1000) + " seconds from " - + InetAddress.getLocalHost() + " to " + host + ":" + port); - } - - if (orchSocket != null && queryGenDuration > 0) { - //wait until orchestrator server's resume message is received. - receiveResume(orchSocket); - - //reset duration and flushed tweet count - tg.resetDurationAndFlushedTweetCount(queryGenDuration); - - prevTS = System.currentTimeMillis(); - round = 0; - //start sending record - while (tg.setNextRecordBatch(recordCountPerBatchDuringQuery)) { - curTS = System.currentTimeMillis(); - if (LOGGER.isLoggable(Level.INFO)) { - round++; - if ((round * recordCountPerBatchDuringQuery) % 100000 == 0) { - System.out.println("DataGen[" + partition - + "][During ingestion + queries][TimeToInsert100000] " - + (curTS - prevTS) + " in milliseconds"); - round = 0; - prevTS = curTS; - } - } - if (dataGenSleepTimeDuringQuery > 0) { - Thread.sleep(dataGenSleepTimeDuringQuery); - } - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("DataGen[" + partition - + "][During ingestion + queries][InsertCount] Num tweets flushed = " - + tg.getNumFlushedTweets() + " in " + queryGenDuration + " seconds from " - + InetAddress.getLocalHost() + " to " + host + ":" + port); - } - //send reached message to orchestrator server - sendReached(orchSocket); - } - - } finally { - if (orchSocket != null) { - orchSocket.close(); - } - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Num tweets flushed = " + tg.getNumFlushedTweets() + " in " + dataGenDuration - + " seconds from " + InetAddress.getLocalHost() + " to " + host + ":" + port); - } - } - } catch (Throwable t) { - t.printStackTrace(); - } finally { - s.close(); - } - } catch (Throwable t) { - System.err.println("Error connecting to " + host + ":" + port); - t.printStackTrace(); - } finally { - sem.release(); - } - } - - private void sendReached(Socket s) throws IOException { - new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.REACHED.ordinal()); - s.getOutputStream().flush(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Sent " + OrchestratorDGProtocol.REACHED + " to " + s.getRemoteSocketAddress()); - } - } - - private void receiveResume(Socket s) throws IOException { - int msg = new DataInputStream(s.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + s.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.RESUME) { - throw new IllegalStateException("Unknown protocol message received: " + msgType); - } - } - - private void sendStopped(Socket s) throws IOException { - new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.STOPPED.ordinal()); - s.getOutputStream().flush(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Sent " + OrchestratorDGProtocol.STOPPED + " to " + s.getRemoteSocketAddress()); - } - } - - } - - private static class CircularByteArrayOutputStream extends OutputStream { - - private final byte[] buf; - - private int index; - - public CircularByteArrayOutputStream() { - buf = new byte[32 * 1024]; - index = 0; - } - - @Override - public void write(byte b[], int off, int len) throws IOException { - if (b == null) { - throw new NullPointerException(); - } else if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) > b.length) || ((off + len) < 0)) { - throw new IndexOutOfBoundsException(); - } else if (len == 0) { - return; - } - - int remain = len; - int remainOff = off; - while (remain > 0) { - int avail = buf.length - index; - System.arraycopy(b, remainOff, buf, index, avail); - remainOff += avail; - remain -= avail; - index = (index + avail) % buf.length; - } - } - - @Override - public void write(int b) throws IOException { - buf[index] = (byte) b; - index = (index + 1) % buf.length; - } - - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java deleted file mode 100644 index d3bb4bd..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorConfig.java +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.util.List; - -import org.apache.commons.lang3.tuple.Pair; -import org.kohsuke.args4j.Argument; -import org.kohsuke.args4j.CmdLineException; -import org.kohsuke.args4j.CmdLineParser; -import org.kohsuke.args4j.Option; -import org.kohsuke.args4j.OptionDef; -import org.kohsuke.args4j.spi.OptionHandler; -import org.kohsuke.args4j.spi.Parameters; -import org.kohsuke.args4j.spi.Setter; - -public class SocketTweetGeneratorConfig { - - @Option(name = "-p", aliases = "--partition-range-start", usage = "Starting partition number for the set of data generators (default = 0)") - private int partitionRangeStart = 0; - - public int getPartitionRangeStart() { - return partitionRangeStart; - } - - @Option(name = "-d", aliases = { "--datagen-duration" }, usage = "Duration in seconds to run data generation") - private int duration = -1; - - public int getDataGenDuration() { - return duration; - } - - @Option(name = "-qd", aliases = { "--querygen-duration" }, usage = "Duration in seconds to run query generation") - private int queryDuration = -1; - - public int getQueryGenDuration() { - return queryDuration; - } - - @Option(name = "-di", aliases = "--data-interval", usage = "Initial data interval to use when generating data based on data size") - private long dataInterval = -1; - - public long getDataInterval() { - return dataInterval; - } - - @Option(name = "-ni", aliases = "--num-intervals", usage = "Number of intervals to use when generating data based on data size (default = 4)") - private int nIntervals = 4; - - public int getNIntervals() { - return nIntervals; - } - - @Option(name = "-oh", aliases = "--orachestrator-host", usage = "The host name of the orchestrator") - private String orchHost; - - public String getOrchestratorHost() { - return orchHost; - } - - @Option(name = "-op", aliases = "--orchestrator-port", usage = "The port number of the orchestrator") - private int orchPort; - - public int getOrchestratorPort() { - return orchPort; - } - - @Option(name = "-of", aliases = "--openstreetmap-filepath", usage = "The open street map gps point data file path") - private String openStreetMapFilePath; - - public String getOpenStreetMapFilePath() { - return openStreetMapFilePath; - } - - @Option(name = "-si", aliases = "--location-sample-interval", usage = "Location sample interval from open street map point data") - private int locationSampleInterval; - - public int getLocationSampleInterval() { - return locationSampleInterval; - } - - @Option(name = "-rcbi", aliases = "--record-count-per-batch-during-ingestion-only", usage = "Record count per batch during ingestion only") - private int recordCountPerBatchDuringIngestionOnly = 1000; - - public int getRecordCountPerBatchDuringIngestionOnly() { - return recordCountPerBatchDuringIngestionOnly; - } - - @Option(name = "-rcbq", aliases = "--record-count-per-batch-during-query", usage = "Record count per batch during query") - private int recordCountPerBatchDuringQuery = 1000; - - public int getRecordCountPerBatchDuringQuery() { - return recordCountPerBatchDuringQuery; - } - - @Option(name = "-dsti", aliases = "--data-gen-sleep-time-during-ingestion-only", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringIngestionOnly records were sent") - private long dataGenSleepTimeDuringIngestionOnly = 1; - - public long getDataGenSleepTimeDuringIngestionOnly() { - return dataGenSleepTimeDuringIngestionOnly; - } - - @Option(name = "-dstq", aliases = "--data-gen-sleep-time-during-query", usage = "DataGen sleep time in milliseconds after every recordCountPerBatchDuringQuery records were sent") - private long dataGenSleepTimeDuringQuery = 1; - - public long getDataGenSleepTimeDuringQuery() { - return dataGenSleepTimeDuringQuery; - } - - @Argument(required = true, usage = "A list of <ip>:<port> pairs (addresses) to send data to", metaVar = "addresses...", handler = AddressOptionHandler.class) - private List<Pair<String, Integer>> addresses; - - public List<Pair<String, Integer>> getAddresses() { - return addresses; - } - - public static class AddressOptionHandler extends OptionHandler<Pair<String, Integer>> { - - public AddressOptionHandler(CmdLineParser parser, OptionDef option, Setter<? super Pair<String, Integer>> setter) { - super(parser, option, setter); - } - - @Override - public int parseArguments(Parameters params) throws CmdLineException { - int counter = 0; - while (true) { - String param; - try { - param = params.getParameter(counter); - } catch (CmdLineException ex) { - break; - } - - String[] hostPort = param.split(":"); - if (hostPort.length != 2) { - throw new CmdLineException("Invalid address: " + param + ". Expected <host>:<port>"); - } - Integer port = null; - try { - port = Integer.parseInt(hostPort[1]); - } catch (NumberFormatException e) { - throw new CmdLineException("Invalid port " + hostPort[1] + " for address " + param + "."); - } - setter.addValue(Pair.of(hostPort[0], port)); - counter++; - } - return counter; - } - - @Override - public String getDefaultMetaVariable() { - return "addresses"; - } - - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java deleted file mode 100644 index d64809f..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SocketTweetGeneratorDriver.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import org.kohsuke.args4j.CmdLineException; -import org.kohsuke.args4j.CmdLineParser; - -public class SocketTweetGeneratorDriver { - public static void main(String[] args) throws Exception { - SocketTweetGeneratorConfig clientConfig = new SocketTweetGeneratorConfig(); - CmdLineParser clp = new CmdLineParser(clientConfig); - try { - clp.parseArgument(args); - } catch (CmdLineException e) { - System.err.println(e.getMessage()); - clp.printUsage(System.err); - System.exit(1); - } - - if ((clientConfig.getDataInterval() == -1 && clientConfig.getDataGenDuration() == -1) - || (clientConfig.getDataInterval() > 0 && clientConfig.getDataGenDuration() > 0)) { - System.err.println("Must use exactly one of -d or -di"); - clp.printUsage(System.err); - System.exit(1); - } - - SocketTweetGenerator client = new SocketTweetGenerator(clientConfig); - client.start(); - } -} http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0e21afa7/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java ---------------------------------------------------------------------- diff --git a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java b/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java deleted file mode 100644 index 4e360dd..0000000 --- a/asterixdb/asterix-experiments/src/main/java/org/apache/asterix/experiment/client/SpatialIndexExperiment2OrchestratorServer.java +++ /dev/null @@ -1,179 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.asterix.experiment.client; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.net.ServerSocket; -import java.net.Socket; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.logging.Level; -import java.util.logging.Logger; - -public class SpatialIndexExperiment2OrchestratorServer { - - private static final Logger LOGGER = Logger.getLogger(SpatialIndexExperiment2OrchestratorServer.class.getName()); - - private final int dataGenPort; - - private final int queryGenPort; - - private final int nDataGens; - - private final int nQueryGens; - - private final int nIntervals; - - private final AtomicBoolean running; - - public SpatialIndexExperiment2OrchestratorServer(int dataGenPort, int nDataGens, int nIntervals, int queryGenPort, - int nQueryGens) { - this.dataGenPort = dataGenPort; - this.nDataGens = nDataGens; - this.queryGenPort = queryGenPort; - this.nQueryGens = nQueryGens; - this.nIntervals = nIntervals; - running = new AtomicBoolean(); - } - - public synchronized void start() throws IOException, InterruptedException { - final AtomicBoolean dataGenBound = new AtomicBoolean(); - final AtomicBoolean queryGenBound = new AtomicBoolean(); - running.set(true); - Thread t = new Thread(new Runnable() { - - @Override - public void run() { - try { - ServerSocket dataGenSS = new ServerSocket(dataGenPort); - synchronized (dataGenBound) { - dataGenBound.set(true); - dataGenBound.notifyAll(); - } - ServerSocket queryGenSS = new ServerSocket(queryGenPort); - synchronized (queryGenBound) { - queryGenBound.set(true); - queryGenBound.notifyAll(); - } - - Socket[] dataConn = new Socket[nDataGens]; - Socket[] queryConn = new Socket[nQueryGens]; - try { - //#.wait until all dataGens and queryGens have connected to the orchestrator - for (int i = 0; i < nDataGens; i++) { - dataConn[i] = dataGenSS.accept(); - } - for (int i = 0; i < nQueryGens; i++) { - queryConn[i] = queryGenSS.accept(); - } - - //#.wait until queryGens are ready for generating query - for (int i = 0; i < nQueryGens; i++) { - receiveReached(queryConn[i]); - } - - //#.wait until dataGens are ready for generating data after nIntervals of data were generated - for (int i = 0; i < nIntervals; i++) { - for (int j = 0; j < nDataGens; j++) { - receiveReached(dataConn[j]); - } - } - - //#.send signal to queryGens to start sending queries - for (int i = 0; i < nQueryGens; i++) { - sendResume(queryConn[i]); - } - //#.send signal to dataGens to start sending records - for (int i = 0; i < nDataGens; i++) { - sendResume(dataConn[i]); - } - - //#.wait until both dataGen and queryGen's are done - for (int i = 0; i < nQueryGens; i++) { - receiveReached(queryConn[i]); - } - for (int i = 0; i < nDataGens; i++) { - receiveReached(dataConn[i]); - } - - } finally { - for (int i = 0; i < nDataGens; ++i) { - if (dataConn[i] != null) { - dataConn[i].close(); - } - } - dataGenSS.close(); - for (int i = 0; i < nQueryGens; ++i) { - if (queryConn[i] != null) { - queryConn[i].close(); - } - } - queryGenSS.close(); - } - running.set(false); - synchronized (SpatialIndexExperiment2OrchestratorServer.this) { - SpatialIndexExperiment2OrchestratorServer.this.notifyAll(); - } - } catch (Throwable t) { - t.printStackTrace(); - } - } - - }); - t.start(); - synchronized (dataGenBound) { - while (!dataGenBound.get()) { - dataGenBound.wait(); - } - } - synchronized (queryGenBound) { - while (!queryGenBound.get()) { - queryGenBound.wait(); - } - } - } - - private static void receiveReached(Socket conn) throws IOException { - int msg = new DataInputStream(conn.getInputStream()).readInt(); - OrchestratorDGProtocol msgType = OrchestratorDGProtocol.values()[msg]; - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Received " + msgType + " from " + conn.getRemoteSocketAddress()); - } - if (msgType != OrchestratorDGProtocol.REACHED) { - throw new IllegalStateException("Encounted unknown message type " + msgType); - } - } - - private void sendResume(Socket s) throws IOException { - new DataOutputStream(s.getOutputStream()).writeInt(OrchestratorDGProtocol.RESUME.ordinal()); - s.getOutputStream().flush(); - if (LOGGER.isLoggable(Level.INFO)) { - LOGGER.info("Sent " + OrchestratorDGProtocol.RESUME + " to " + s.getRemoteSocketAddress()); - } - } - - public synchronized void awaitFinished() throws InterruptedException { - while (running.get()) { - wait(); - } - } - -}
