A collection of changes to the benchmark scripts. - Adding more logging. - Creates partitioned files instead of monthly files. - Clean up unused code. - Added a script for MRQL tests.
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/481f9edb Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/481f9edb Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/481f9edb Branch: refs/heads/site Commit: 481f9edbd118e202671915a2605cb5540ad98723 Parents: d76e4ed Author: Preston Carman <[email protected]> Authored: Fri Jun 27 14:26:25 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Fri Jun 27 14:26:25 2014 -0700 ---------------------------------------------------------------------- .../noaa-ghcn-daily/scripts/run_benchmark.sh | 7 +- .../scripts/run_benchmark_cluster.sh | 59 +++--- .../noaa-ghcn-daily/scripts/run_mrql_test.sh | 29 +++ .../scripts/weather_benchmark.py | 138 ++++++++++--- .../noaa-ghcn-daily/scripts/weather_cli.py | 11 +- .../noaa-ghcn-daily/scripts/weather_config.py | 12 +- .../scripts/weather_convert_to_xml.py | 200 +------------------ .../scripts/weather_data_files.py | 107 +++++----- 8 files changed, 255 insertions(+), 308 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh index 2dd070c..b2b1531 100755 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh @@ -25,6 +25,8 @@ # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "-client-net-ip-address 169.235.27.138" # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03 # +REPEAT=5 +FRAME_SIZE=10000 if [ -z "${1}" ] then @@ -32,15 +34,18 @@ then exit fi +export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties" + for j in $(find ${1} -name '*q??.xq') do if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]] then + date echo "Running query: ${j}" log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" log_base_path=$(dirname ${j/queries/query_logs}) mkdir -p ${log_base_path} - time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > ${log_base_path}/${log_file} 2>&1 + time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec ${REPEAT} > ${log_base_path}/${log_file} 2>&1 fi; done http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh index a77f3c2..f868024 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh @@ -25,8 +25,8 @@ # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "-client-net-ip-address 169.235.27.138" # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03 # - -CLUSTER_COUNT=5 +REPEAT=5 +FRAME_SIZE=10000 if [ -z "${1}" ] then @@ -34,30 +34,39 @@ then exit fi -# Run queries for each number of nodes. -for (( i = 0; i < ${CLUSTER_COUNT}; i++ )) -do - echo "Starting ${i} cluster nodes" - python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${i}nodes.xml -a start - - for j in $(find ${1} -name '*q??.xq') - do - # Only work with i nodes. - if [[ "${j}" =~ "${i}nodes" ]] +if [ -z "${2}" ] +then + echo "Please the number of nodes (start at 0)." + exit +fi + +# Run queries for the specified number of nodes. +echo "Starting ${2} cluster nodes" +python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${2}nodes.xml -a start + +# wait for cluster to finish setting up +sleep 5 + +export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties" + +for j in $(find ${1} -name '*q??.xq') +do + # Only work with i nodes. + if [[ "${j}" =~ "${2}nodes" ]] + then + # Only run for specified queries. + if [ -z "${4}" ] || [[ "${j}" =~ "${4}" ]] then - # Only run for specified queries. - if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]] - then - echo "Running query: ${j}" - log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" - log_base_path=$(dirname ${j/queries/query_logs}) - mkdir -p ${log_base_path} - time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > ${log_base_path}/${log_file} 2>&1 - fi; + date + echo "Running query: ${j}" + log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" + log_base_path=$(dirname ${j/queries/query_logs}) + mkdir -p ${log_base_path} + time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${3} -timing -showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec ${REPEAT} > ${log_base_path}/${log_file} 2>&1 fi; - done - - # Stop cluster. - python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${i}nodes.xml -a stop + fi; done + +# Stop cluster. +python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${2}nodes.xml -a stop http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh new file mode 100644 index 0000000..dd25c01 --- /dev/null +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_mrql_test.sh @@ -0,0 +1,29 @@ +#!/bin/bash +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +export JAVA_HOME=/home/ecarm002/java/jdk1.6.0_45 +REPEAT=${1} + +#for n in `seq 0 7` +for n in 6 +do + date + echo "Running q0${n} for MRQL." + time for i in {1..${REPEAT}}; do ~/mrql/incubator-mrql/bin/mrql -dist -nodes 5 ~/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/other_systems/mrql/q0${n}.mrql > weather_data/mrql/query_logs/gsn/q0${n}.mrql.log 2>&1; done; +done http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index 081f80a..f3c9e68 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -32,10 +32,39 @@ from weather_data_files import * # logs/ class WeatherBenchmark: + DATA_LINKS_FOLDER = "data_links/" + LARGE_FILE_ROOT_TAG = WeatherDataFiles.LARGE_FILE_ROOT_TAG QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/" QUERY_MASTER_FOLDER = "../queries/" - QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq", "q06.xq", "q07.xq"] - QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", "q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", "q06_sensor.xq", "q06_station.xq", "q07_tmin.xq", "q07_tmax.xq"] + QUERY_FILE_LIST = [ + "q00.xq", + "q01.xq", + "q02.xq", + "q03.xq", + "q04.xq", + "q05.xq", + "q06.xq", + "q07.xq" + ] + QUERY_UTILITY_LIST = [ + "sensor_count.xq", + "station_count.xq", + "q04_join_count.xq", + "q04_sensor.xq", + "q04_station.xq", + "q05_join_count.xq", + "q05_sensor.xq", + "q05_station.xq", + "q06_join_count.xq", + "q06_sensor.xq", + "q06_station.xq", + "q07_join_count.xq", + "q07_tmin.xq", + "q07_tmin_values.xq", + "q07_tmin_self.xq", + "q07_tmax.xq", + "q07_tmax_values.xq" + ] BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] QUERY_COLLECTIONS = ["sensors", "stations"] @@ -48,30 +77,30 @@ class WeatherBenchmark: self.dataset = dataset self.nodes = nodes - def print_partition_scheme(self, xml_save_path): + def print_partition_scheme(self): if (len(self.base_paths) == 0): return for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: - self.print_local_partition_schemes(test, xml_save_path) + self.print_local_partition_schemes(test) elif test in self.BENCHMARK_CLUSTER_TESTS: - self.print_cluster_partition_schemes(test, xml_save_path) + self.print_cluster_partition_schemes(test) else: print "Unknown test." exit() - def print_local_partition_schemes(self, test, xml_save_path): + def print_local_partition_schemes(self, test): node_index = 0 virtual_partitions = get_local_virtual_partitions(self.partitions) for p in self.partitions: - scheme = self.get_local_partition_scheme(test, xml_save_path, p) + scheme = self.get_local_partition_scheme(test, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) - def print_cluster_partition_schemes(self, test, xml_save_path): + def print_cluster_partition_schemes(self, test): node_index = self.get_current_node_index() virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) for p in self.partitions: - scheme = self.get_cluster_partition_scheme(test, xml_save_path, p) + scheme = self.get_cluster_partition_scheme(test, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id): @@ -94,11 +123,11 @@ class WeatherBenchmark: else: print " Scheme is EMPTY." - def get_local_partition_scheme(self, test, xml_save_path, partition): + def get_local_partition_scheme(self, test, partition): scheme = [] virtual_partitions = get_local_virtual_partitions(self.partitions) data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths) - link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test) + link_base_schemes = get_partition_scheme(0, partition, self.base_paths, self.DATA_LINKS_FOLDER + test) # Match link paths to real data paths. group_size = len(data_schemes) / len(link_base_schemes) @@ -117,7 +146,7 @@ class WeatherBenchmark: offset += group_size return scheme - def get_cluster_partition_scheme(self, test, xml_save_path, partition): + def get_cluster_partition_scheme(self, test, partition): node_index = self.get_current_node_index() if node_index == -1: print "Unknown host." @@ -127,7 +156,7 @@ class WeatherBenchmark: local_virtual_partitions = get_local_virtual_partitions(self.partitions) virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths) - link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test) + link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, self.DATA_LINKS_FOLDER + test) # Match link paths to real data paths. for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: @@ -145,6 +174,7 @@ class WeatherBenchmark: has_data = True if link_node < node_index: has_data = False + # Make links for date_node, data_disk, data_virtual, data_index, data_path in data_schemes: if has_data and data_disk == link_disk \ @@ -153,36 +183,68 @@ class WeatherBenchmark: scheme.append([link_disk, -1, link_index, "", link_path]) return scheme - def build_data_links(self, xml_save_path): + def build_data_links(self, reset): if (len(self.base_paths) == 0): return + if reset: + shutil.rmtree(self.base_paths[0] + self.DATA_LINKS_FOLDER) for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: for i in self.partitions: - scheme = self.get_local_partition_scheme(test, xml_save_path, i) + scheme = self.get_local_partition_scheme(test, i) + self.build_data_links_scheme(scheme) + if 1 in self.partitions and len(self.base_paths) > 1: + scheme = self.build_data_links_local_zero_partition(test) self.build_data_links_scheme(scheme) elif test in self.BENCHMARK_CLUSTER_TESTS: for i in self.partitions: - scheme = self.get_cluster_partition_scheme(test, xml_save_path, i) + scheme = self.get_cluster_partition_scheme(test, i) + self.build_data_links_scheme(scheme) + if 1 in self.partitions and len(self.base_paths) > 1: + scheme = self.build_data_links_cluster_zero_partition(test) self.build_data_links_scheme(scheme) else: print "Unknown test." exit() def build_data_links_scheme(self, scheme): - """Build all the data links based on the scheme information.""" - link_path_cleared = [] + '''Build all the data links based on the scheme information.''' for (data_disk, data_index, partition, data_path, link_path) in scheme: - if link_path not in link_path_cleared and os.path.isdir(link_path): - shutil.rmtree(link_path) - link_path_cleared.append(link_path) self.add_collection_links_for(data_path, link_path, data_index) + def build_data_links_cluster_zero_partition(self, test): + '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' + scheme = [] + link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, self.base_paths, self.DATA_LINKS_FOLDER + test) + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: + new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test + "/" + str(link_node) + "nodes") + scheme.append([0, link_disk, 0, link_path, new_link_path]) + return scheme + + def build_data_links_local_zero_partition(self, test): + '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' + scheme = [] + index = 0 + link_base_schemes = get_partition_scheme(0, 1, self.base_paths, self.DATA_LINKS_FOLDER + test) + for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: + if test == "local_batch_scale_out" and index > 0: + continue + new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test) + scheme.append([0, index, 0, link_path, new_link_path]) + index += 1 + return scheme + + def get_zero_partition_path(self, node, key): + '''Return a partition path for the zero partition.''' + base_path = self.base_paths[0] + new_link_path = get_partition_scheme(node, 1, [base_path], key)[0][PARTITION_INDEX_PATH] + return new_link_path.replace("p1", "p0") + def get_current_node_index(self): found = False node_index = 0 for machine in self.nodes: - if socket.gethostname() == machine.get_node_name(): + if socket.gethostname().startswith(machine.get_node_name()): found = True break node_index += 1 @@ -195,10 +257,13 @@ class WeatherBenchmark: def add_collection_links_for(self, real_path, link_path, index): for collection in self.QUERY_COLLECTIONS: collection_path = link_path + collection + "/" + collection_index = collection_path + "index" + str(index) if not os.path.isdir(collection_path): os.makedirs(collection_path) if index >= 0: - os.symlink(real_path + collection + "/", collection_path + "index" + str(index)) + if os.path.islink(collection_index): + os.unlink(collection_index) + os.symlink(real_path + collection + "/", collection_index) def copy_query_files(self, reset): for test in self.dataset.get_tests(): @@ -213,25 +278,39 @@ class WeatherBenchmark: def copy_cluster_query_files(self, test, reset): '''Determine the data_link path for cluster query files and copy with new location for collection.''' - partitions = self.dataset.get_partitions()[0] + if 1 in self.partitions and len(self.base_paths) > 1: + for n in range(len(self.nodes)): + query_path = get_cluster_query_path(self.base_paths, test, 0, n) + prepare_path(query_path, reset) + + # Copy query files. + new_link_path = self.get_zero_partition_path(n, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") + self.copy_and_replace_query(query_path, [new_link_path]) for n in range(len(self.nodes)): for p in self.partitions: query_path = get_cluster_query_path(self.base_paths, test, p, n) prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(n, p, self.base_paths, "data_links/" + test + "/" + str(n) + "nodes") + partition_paths = get_partition_paths(n, p, self.base_paths, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") self.copy_and_replace_query(query_path, partition_paths) def copy_local_query_files(self, test, reset): '''Determine the data_link path for local query files and copy with new location for collection.''' + if 1 in self.partitions and len(self.base_paths) > 1: + query_path = get_local_query_path(self.base_paths, test, 0) + prepare_path(query_path, reset) + + # Copy query files. + new_link_path = self.get_zero_partition_path(0, self.DATA_LINKS_FOLDER + test) + self.copy_and_replace_query(query_path, [new_link_path]) for p in self.partitions: query_path = get_local_query_path(self.base_paths, test, p) prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(0, p, self.base_paths, "data_links/" + test) + partition_paths = get_partition_paths(0, p, self.base_paths, self.DATA_LINKS_FOLDER + test) self.copy_and_replace_query(query_path, partition_paths) def copy_and_replace_query(self, query_path, replacement_list): @@ -250,6 +329,13 @@ class WeatherBenchmark: for line in fileinput.input(query_path + query_file, True): sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + collection, replace_string)) + # Make a search replace for partition type. + if self.dataset.get_partition_type() == "large_files": + for line in fileinput.input(query_path + query_file, True): + sys.stdout.write(line.replace("/stationCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/stationCollection")) + for line in fileinput.input(query_path + query_file, True): + sys.stdout.write(line.replace("/dataCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/dataCollection")) + def get_number_of_slices(self): if len(self.dataset.get_tests()) == 0: print "No test has been defined in config file." http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index 5bfa698..8ac6d17 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -207,15 +207,18 @@ def main(argv): print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').' data.reset() if section == "partition_scheme": - benchmark.print_partition_scheme(xml_data_save_path) + benchmark.print_partition_scheme() else: - data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) + if dataset.get_partition_type() == "large_files": + data.build_to_n_partition_files(xml_data_save_path, slices, base_paths, reset) + else: + data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node print 'Processing the test links section (' + dataset.get_name() + ').' - benchmark.print_partition_scheme(xml_data_save_path) - benchmark.build_data_links(xml_data_save_path) + benchmark.print_partition_scheme() + benchmark.build_data_links(reset) if section in ("all", "queries"): print 'Processing the queries section (' + dataset.get_name() + ').' http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py index a6513c2..80607b8 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py @@ -41,9 +41,10 @@ class WeatherConfig: for node in self.config.getElementsByTagName("dataset"): name = self.get_dataset_name(node) save_paths = self.get_dataset_save_paths(node) + partition_type = self.get_dataset_partition_type(node) partitions = self.get_dataset_partitions(node) tests = self.get_dataset_tests(node) - nodes.append(Dataset(name, save_paths, partitions, tests)) + nodes.append(Dataset(name, save_paths, partition_type, partitions, tests)) return nodes @@ -69,6 +70,9 @@ class WeatherConfig: paths.append(self.get_text(item)) return paths + def get_dataset_partition_type(self, node): + return self.get_text(node.getElementsByTagName("partition_type")[0]) + def get_dataset_partitions(self, node): paths = [] for item in node.getElementsByTagName("partitions_per_path"): @@ -103,10 +107,11 @@ class Machine: return self.id + "(" + self.ip + ")" class Dataset: - def __init__(self, name, save_paths, partitions, tests): + def __init__(self, name, save_paths, partition_type, partitions, tests): self.name = name self.save_paths = save_paths self.partitions = partitions + self.partition_type = partition_type self.tests = tests def get_name(self): @@ -118,6 +123,9 @@ class Dataset: def get_partitions(self): return self.partitions + def get_partition_type(self): + return self.partition_type + def get_tests(self): return self.tests http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py index a4f33a1..5db090a 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py @@ -150,10 +150,6 @@ class WeatherConvertToXML: # Default return 0 - def process_one_day(self, records, report_date, page): - # Default - return 0 - def process_station_data(self, row): # Default return 0 @@ -165,7 +161,7 @@ class WeatherConvertToXML: print "Processing inventory file" file_stream = open(self.ghcnd_inventory, 'r') - csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] + csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] row = file_stream.readline() csv_inventory = {} for row in file_stream: @@ -243,16 +239,6 @@ class WeatherConvertToXML: def convert_c2f(self, c): return (9 / 5 * c) + 32 - def default_xml_start(self): - return textwrap.dedent("""\ - <?xml version="1.0" encoding="ISO-8859-1"?> - <ghcnd_observation version="1.0" - xmlns:xsd="http://www.w3.org/2001/XMLSchema" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> - <credit>NOAA's National Climatic Data Center (NCDC)</credit> - <credit_URL>http://www.ncdc.noaa.gov/</credit_URL> - """) - def default_xml_web_service_start(self): field_xml = "" field_xml += "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n" @@ -273,76 +259,6 @@ class WeatherConvertToXML: field_xml += self.get_indent_space(indent) + "<date>" + str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) + "T00:00:00.000</date>\n" return field_xml - def get_date_from_field(self, row, field): - report_date = self.get_field_from_definition(row, field) - return str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) - - def default_xml_field_date_iso8601(self, report_date): - field_xml = "" - field_xml += " <observation_date>" + self.MONTHS[report_date.month - 1] + " " + str(report_date.day) + ", " + str(report_date.year) + "</observation_date>\n" - field_xml += " <observation_date_iso8601>" + report_date.isoformat() + "</observation_date_iso8601>\n" - return field_xml - - def default_xml_field_date_year(self, year): - field_xml = "" - field_xml += " <observation_year>" + str(year) + "</observation_year>\n" - return field_xml - - def default_xml_field_date_month(self, month): - field_xml = "" - field_xml += " <observation_month>" + str(month) + "</observation_month>\n" - return field_xml - - def default_xml_field_date_day(self, day): - field_xml = "" - field_xml += " <observation_day>" + str(day) + "</observation_day>\n" - return field_xml - - def default_xml_field_station_id(self, station_id, indent=2): - field_xml = "" - field_xml += self.get_indent_space(indent) + "<station_id>" + station_id + "</station_id>\n" - return field_xml - - def default_xml_field_station(self, station_id): - station_row = "" - stations_file = open(self.ghcnd_stations, 'r') - - for line in stations_file: - if station_id == self.get_field_from_definition(line, STATIONS_FIELDS['ID']): - station_row = line - break - - field_xml = "" - field_xml += " <station_id>" + station_id + "</station_id>\n" - field_xml += " <location>\n" - field_xml += " <latitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LATITUDE']).strip() + "</latitude>\n" - field_xml += " <longitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LONGITUDE']).strip() + "</longitude>\n" - - elevation = self.get_field_from_definition(station_row, STATIONS_FIELDS['ELEVATION']).strip() - if elevation != "-999.9": - field_xml += " <elevation>" + elevation + "</elevation>\n" - - field_xml += " </location>\n" - field_xml += " <name>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['NAME']).strip() + "</name>\n" - - state = self.get_field_from_definition(station_row, STATIONS_FIELDS['STATE']) - if state.strip(): - field_xml += " <state>" + state + "</state>\n" - - gsn = self.get_field_from_definition(station_row, STATIONS_FIELDS['GSNFLAG']) - if gsn.strip(): - field_xml += " <gsn />\n" - - hcn = self.get_field_from_definition(station_row, STATIONS_FIELDS['HCNFLAG']) - if hcn.strip(): - field_xml += " <hcn />\n" - - wmoid = self.get_field_from_definition(station_row, STATIONS_FIELDS['WMOID']) - if wmoid.strip(): - field_xml += " <wmoid id=\"" + wmoid + "\" />\n" - - return field_xml - def default_xml_mshr_station_additional(self, station_id): """The web service station data is generate from the MSHR data supplemented with GHCN-Daily.""" station_mshr_row = "" @@ -492,120 +408,6 @@ class WeatherConvertToXML: def get_indent_space(self, indent): return (" " * (4 * indent)) -class WeatherDailyXMLFile(WeatherConvertToXML): - def process_one_month_sensor_set(self, records, page): - year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) - month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) - - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - - count = 0 - for day in range(1, 32): - try: - # TODO find out what is a valid python date range? 1889? - # Attempt to see if this is valid date. - report_date = date(year, month, day) - save_file_name = self.process_one_day(records, report_date, page) - if save_file_name is not "": - count = count + 1 - if self.debug_output: - print "Wrote file: " + save_file_name - except ValueError: - if self.debug_output: - print "Error: Not a valid date (" + str(month) + "/" + str(day) + "/" + str(year) + ") for " + station_id + "." - pass - return count - - def process_one_day(self, records, report_date, page): - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - found_data = False - - # Information for each daily file. - daily_xml_file = self.default_xml_start() - daily_xml_file += self.default_xml_field_station(station_id) - daily_xml_file += self.default_xml_field_date_iso8601(report_date) - daily_xml_file += self.default_xml_start_tag("sensors") - for record in records: - record_xml_snip = self.default_xml_day_reading_as_field(record, report_date.day) - if record_xml_snip is not "": - found_data = True - daily_xml_file += record_xml_snip - daily_xml_file += self.default_xml_end_tag("sensors") - daily_xml_file += self.default_xml_end() - - if not found_data: - return "" - - # Make sure the station folder is available. - ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" - if not os.path.isdir(ghcnd_xml_station_path): - os.makedirs(ghcnd_xml_station_path) - - # Save XML string to disk. - save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) - save_file_name = self.save_file(save_file_name, daily_xml_file) - - return save_file_name - -class WeatherMonthlyXMLFile(WeatherConvertToXML): - def process_one_month_sensor_set(self, records, page): - found_data = False - year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) - month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) - - station_id = self.get_dly_field(records[0], DLY_FIELD_ID) - - # Information for each daily file. - daily_xml_file = self.default_xml_start() - daily_xml_file += self.default_xml_field_station(station_id) - daily_xml_file += self.default_xml_field_date_year(year) - daily_xml_file += self.default_xml_field_date_month(month) - daily_xml_file += self.default_xml_start_tag("readings") - - for day in range(1, 32): - try: - # TODO find out what is a valid python date range? 1889? - # Attempt to see if this is valid date. - report_date = date(year, month, day) - found_daily_data = False - record_xml_snip = "" - - for record in records: - record_xml_snip += self.default_xml_day_reading_as_field(record, report_date.day) - if record_xml_snip is not "": - found_data = True - found_daily_data = True - - if found_daily_data: - daily_xml_file += self.default_xml_start_tag("reading", 2) - daily_xml_file += self.default_xml_field_date_day(day) - daily_xml_file += record_xml_snip - daily_xml_file += self.default_xml_end_tag("reading", 2) - - except ValueError: - pass - - daily_xml_file += self.default_xml_end_tag("readings") - daily_xml_file += self.default_xml_end() - - if not found_data: - return 0 - - # Make sure the station folder is available. - ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" - if not os.path.isdir(ghcnd_xml_station_path): - os.makedirs(ghcnd_xml_station_path) - - # Save XML string to disk. - save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) - save_file_name = self.save_file(save_file_name, daily_xml_file) - - if save_file_name is not "": - if self.debug_output: - print "Wrote file: " + save_file_name - return 1 - else: - return 0 class WeatherWebServiceMonthlyXMLFile(WeatherConvertToXML): """The web service class details how to create files similar to the NOAA web service.""" http://git-wip-us.apache.org/repos/asf/vxquery/blob/481f9edb/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index a7fb691..1c9f129 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -26,6 +26,8 @@ from collections import OrderedDict # Allows partition and picking up where you left off. class WeatherDataFiles: + LARGE_FILE_ROOT_TAG = "root" + INDEX_DATA_FILE_NAME = 0 INDEX_DATA_SENSORS_STATUS = 1 INDEX_DATA_STATION_STATUS = 2 @@ -51,14 +53,10 @@ class WeatherDataFiles: self.current = self.DATA_FILE_START_INDEX self.progress_data = [] - - def get_file_list(self): - return glob.glob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) - def get_file_list_iterator(self): + """Return the list of files one at a time.""" return glob.iglob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) - # Save Functions def build_progress_file(self, options, convert): if not os.path.isfile(self.progress_file_name) or 'reset' in options: @@ -98,10 +96,8 @@ class WeatherDataFiles: return # Initialize the partition paths. - partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) for path in partition_paths: - partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) @@ -145,66 +141,72 @@ class WeatherDataFiles: if current_station_partition >= len(partition_paths): current_station_partition = 0 - - def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset): - """Once the initial data has been generated, the data can be copied into a set number of partitions. """ + def build_to_n_partition_files(self, save_path, partitions, base_paths, reset): + """Once the initial data has been generated, the data can be divided into partitions + and stored in single files. + """ if (len(base_paths) == 0): return + XML_START = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>" + # Initialize the partition paths. - partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) + sensors_partition_files = [] + stations_partition_files = [] for path in partition_paths: - partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) + prepare_path(path + "sensors/", False) + prepare_path(path + "stations/", False) + sensors_partition_files.append(open(path + "sensors/partition.xml", 'w')) + stations_partition_files.append(open(path + "stations/partition.xml", 'w')) + + for row in range(0, len(partition_paths)): + sensors_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") + stations_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") - # copy stations and sensors into each partition - current_partition = 0 - csv_sorted = self.get_csv_in_partition_order() - for item, size in csv_sorted.iteritems(): - if size < 0: - print "The progress file does not have the sensor size data saved." - return - - station_id = item.split('.')[0] - # Update partition bases on smallest current size. - current_partition = partition_sizes.index(min(partition_sizes)) - - # Copy sensor files - type = "sensors" - file_path = build_base_save_folder(save_path, station_id, type) + station_id - new_file_path = build_base_save_folder(partition_paths[current_partition], station_id, type) + station_id - if os.path.isdir(file_path): - distutils.dir_util.copy_tree(file_path, new_file_path) - partition_sizes[current_partition] += size + import fnmatch + import os - # Copy station files - type = "stations" - file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" - new_file_base = build_base_save_folder(partition_paths[current_partition], station_id, type) - new_file_path = new_file_base + station_id + ".xml" - if os.path.isfile(file_path): - if not os.path.isdir(new_file_base): - os.makedirs(new_file_base) - shutil.copyfile(file_path, new_file_path) - - def get_csv_in_partition_order(self): + # copy stations and sensors into each partition + current_sensor_partition = 0 + current_station_partition = 0 self.open_progress_data() row_count = len(self.progress_data) - - # Get the dictionary of all the files and data sizes. - csv_dict = dict() for row in range(0, row_count): row_contents = self.progress_data[row].rsplit(self.SEPERATOR) file_name = row_contents[self.INDEX_DATA_FILE_NAME] - folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA]) + station_id = os.path.basename(file_name).split('.')[0] + + # Copy sensor files + type = "sensors" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + for root, dirnames, filenames in os.walk(file_path): + for filename in fnmatch.filter(filenames, '*.xml'): + xml_path = os.path.join(root, filename) + xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" + sensors_partition_files[current_sensor_partition].write(xml_data) + current_sensor_partition += 1 + if current_sensor_partition >= len(sensors_partition_files): + current_sensor_partition = 0 - csv_dict[file_name] = folder_data - - # New sorted list. - return OrderedDict(sorted(csv_dict.items(), key=lambda x: x[1], reverse=True)) - + # Copy station files + type = "stations" + file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" + xml_path = os.path.join(root, file_path) + xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" + stations_partition_files[current_station_partition].write(xml_data) + current_station_partition += 1 + if current_station_partition >= len(partition_paths): + current_station_partition = 0 + + for row in range(0, len(partition_paths)): + sensors_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") + sensors_partition_files[row].close() + stations_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") + stations_partition_files[row].close() + def get_file_row(self, file_name): for i in range(0, len(self.progress_data)): if self.progress_data[i].startswith(file_name): @@ -405,3 +407,6 @@ def prepare_path(path, reset): if not os.path.isdir(path): os.makedirs(path) +def file_get_contents(filename): + with open(filename) as f: + return f.read()
