Moved all benchmark changes to a new branch.
Project: http://git-wip-us.apache.org/repos/asf/vxquery/repo Commit: http://git-wip-us.apache.org/repos/asf/vxquery/commit/94acdb09 Tree: http://git-wip-us.apache.org/repos/asf/vxquery/tree/94acdb09 Diff: http://git-wip-us.apache.org/repos/asf/vxquery/diff/94acdb09 Branch: refs/heads/site Commit: 94acdb097cb6e95ac1547987f76e71763dc0960b Parents: fee9582 Author: Preston Carman <[email protected]> Authored: Wed Jun 25 19:07:53 2014 -0700 Committer: Preston Carman <[email protected]> Committed: Wed Jun 25 19:07:53 2014 -0700 ---------------------------------------------------------------------- .../resources/noaa-ghcn-daily/queries/q00.xq | 13 +- .../resources/noaa-ghcn-daily/queries/q02.xq | 12 +- .../noaa-ghcn-daily/scripts/run_benchmark.sh | 7 +- .../scripts/run_benchmark_cluster.sh | 59 +++--- .../scripts/weather_benchmark.py | 138 +++---------- .../noaa-ghcn-daily/scripts/weather_cli.py | 11 +- .../noaa-ghcn-daily/scripts/weather_config.py | 12 +- .../scripts/weather_convert_to_xml.py | 200 ++++++++++++++++++- .../scripts/weather_data_files.py | 107 +++++----- .../noaa-ghcn-daily/scripts/weather_example.xml | 34 ++++ .../scripts/weather_example_cluster.xml | 56 ++++++ 11 files changed, 408 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q00.xq ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q00.xq b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q00.xq index 5006a21..743f95a 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q00.xq +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q00.xq @@ -14,17 +14,14 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. :) -(: -XQuery Filter Query -------------------- -See historical data for Key West International Airport, FL (USW00012836) -station by selecting the weather readings for December 25 over the last -10 years. -:) + +(: XQuery Filter Query :) +(: See historical data for Riverside, CA (ASN00008113) station by selecting :) +(: the weather readings for December 25 over the last 10 years. :) let $collection := "/tmp/1.0_partition_ghcnd_all_xml/sensors" for $r in collection($collection)/dataCollection/data let $datetime := xs:dateTime(fn:data($r/date)) -where $r/station eq "GHCND:USW00012836" +where $r/station eq "GHCND:ASN00008113" and fn:year-from-dateTime($datetime) ge 2003 and fn:month-from-dateTime($datetime) eq 12 and fn:day-from-dateTime($datetime) eq 25 http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q02.xq ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q02.xq b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q02.xq index 0635618..cbe72da 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q02.xq +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/queries/q02.xq @@ -14,16 +14,14 @@ KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. :) -(: -XQuery Aggregate Query ----------------------- -Find the annual precipitation (PRCP) for a Syracuse, NY using the airport -weather station (USW00014771) report for 1999. -:) + +(: XQuery Aggregate Query :) +(: Find the annual precipitation (PRCP) for a Seattle using the airport :) +(: station (USW00024233) for 1999. :) fn:sum( let $collection := "/tmp/1.0_partition_ghcnd_all_xml/sensors" for $r in collection($collection)/dataCollection/data - where $r/station eq "GHCND:USW00014771" + where $r/station eq "GHCND:USW00024233" and $r/dataType eq "PRCP" and fn:year-from-dateTime(xs:dateTime(fn:data($r/date))) eq 1999 return $r/value http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh index b2b1531..2dd070c 100755 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark.sh @@ -25,8 +25,6 @@ # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "-client-net-ip-address 169.235.27.138" # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03 # -REPEAT=5 -FRAME_SIZE=10000 if [ -z "${1}" ] then @@ -34,18 +32,15 @@ then exit fi -export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties" - for j in $(find ${1} -name '*q??.xq') do if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]] then - date echo "Running query: ${j}" log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" log_base_path=$(dirname ${j/queries/query_logs}) mkdir -p ${log_base_path} - time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec ${REPEAT} > ${log_base_path}/${log_file} 2>&1 + time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > ${log_base_path}/${log_file} 2>&1 fi; done http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh index f868024..a77f3c2 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/run_benchmark_cluster.sh @@ -25,8 +25,8 @@ # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "-client-net-ip-address 169.235.27.138" # run_benchmark.sh ./noaa-ghcn-daily/benchmarks/local_speed_up/queries/ "" q03 # -REPEAT=5 -FRAME_SIZE=10000 + +CLUSTER_COUNT=5 if [ -z "${1}" ] then @@ -34,39 +34,30 @@ then exit fi -if [ -z "${2}" ] -then - echo "Please the number of nodes (start at 0)." - exit -fi - -# Run queries for the specified number of nodes. -echo "Starting ${2} cluster nodes" -python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${2}nodes.xml -a start - -# wait for cluster to finish setting up -sleep 5 - -export JAVA_OPTS="$JAVA_OPTS -server -Xmx8G -XX:+HeapDumpOnOutOfMemoryError -Djava.util.logging.config.file=./vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/benchmark_logging.properties" - -for j in $(find ${1} -name '*q??.xq') -do - # Only work with i nodes. - if [[ "${j}" =~ "${2}nodes" ]] - then - # Only run for specified queries. - if [ -z "${4}" ] || [[ "${j}" =~ "${4}" ]] +# Run queries for each number of nodes. +for (( i = 0; i < ${CLUSTER_COUNT}; i++ )) +do + echo "Starting ${i} cluster nodes" + python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${i}nodes.xml -a start + + for j in $(find ${1} -name '*q??.xq') + do + # Only work with i nodes. + if [[ "${j}" =~ "${i}nodes" ]] then - date - echo "Running query: ${j}" - log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" - log_base_path=$(dirname ${j/queries/query_logs}) - mkdir -p ${log_base_path} - time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${3} -timing -showquery -showoet -showrp -frame-size ${FRAME_SIZE} -repeatexec ${REPEAT} > ${log_base_path}/${log_file} 2>&1 + # Only run for specified queries. + if [ -z "${3}" ] || [[ "${j}" =~ "${3}" ]] + then + echo "Running query: ${j}" + log_file="$(basename ${j}).$(date +%Y%m%d%H%M).log" + log_base_path=$(dirname ${j/queries/query_logs}) + mkdir -p ${log_base_path} + time sh ./vxquery-cli/target/appassembler/bin/vxq ${j} ${2} -timing -showquery -showoet -showrp -frame-size 10000 -repeatexec 10 > ${log_base_path}/${log_file} 2>&1 + fi; fi; - fi; -done + done -# Stop cluster. -python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${2}nodes.xml -a stop + # Stop cluster. + python vxquery-server/src/main/resources/scripts/cluster_cli.py -c vxquery-server/src/main/resources/conf/${i}nodes.xml -a stop +done http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py index f3c9e68..081f80a 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_benchmark.py @@ -32,39 +32,10 @@ from weather_data_files import * # logs/ class WeatherBenchmark: - DATA_LINKS_FOLDER = "data_links/" - LARGE_FILE_ROOT_TAG = WeatherDataFiles.LARGE_FILE_ROOT_TAG QUERY_REPLACEMENT_KEY = "/tmp/1.0_partition_ghcnd_all_xml/" QUERY_MASTER_FOLDER = "../queries/" - QUERY_FILE_LIST = [ - "q00.xq", - "q01.xq", - "q02.xq", - "q03.xq", - "q04.xq", - "q05.xq", - "q06.xq", - "q07.xq" - ] - QUERY_UTILITY_LIST = [ - "sensor_count.xq", - "station_count.xq", - "q04_join_count.xq", - "q04_sensor.xq", - "q04_station.xq", - "q05_join_count.xq", - "q05_sensor.xq", - "q05_station.xq", - "q06_join_count.xq", - "q06_sensor.xq", - "q06_station.xq", - "q07_join_count.xq", - "q07_tmin.xq", - "q07_tmin_values.xq", - "q07_tmin_self.xq", - "q07_tmax.xq", - "q07_tmax_values.xq" - ] + QUERY_FILE_LIST = ["q00.xq", "q01.xq", "q02.xq", "q03.xq", "q04.xq", "q05.xq", "q06.xq", "q07.xq"] + QUERY_UTILITY_LIST = ["sensor_count.xq", "station_count.xq", "q04_sensor.xq", "q04_station.xq", "q05_sensor.xq", "q05_station.xq", "q06_sensor.xq", "q06_station.xq", "q07_tmin.xq", "q07_tmax.xq"] BENCHMARK_LOCAL_TESTS = ["local_speed_up", "local_batch_scale_out"] BENCHMARK_CLUSTER_TESTS = ["speed_up", "batch_scale_out"] QUERY_COLLECTIONS = ["sensors", "stations"] @@ -77,30 +48,30 @@ class WeatherBenchmark: self.dataset = dataset self.nodes = nodes - def print_partition_scheme(self): + def print_partition_scheme(self, xml_save_path): if (len(self.base_paths) == 0): return for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: - self.print_local_partition_schemes(test) + self.print_local_partition_schemes(test, xml_save_path) elif test in self.BENCHMARK_CLUSTER_TESTS: - self.print_cluster_partition_schemes(test) + self.print_cluster_partition_schemes(test, xml_save_path) else: print "Unknown test." exit() - def print_local_partition_schemes(self, test): + def print_local_partition_schemes(self, test, xml_save_path): node_index = 0 virtual_partitions = get_local_virtual_partitions(self.partitions) for p in self.partitions: - scheme = self.get_local_partition_scheme(test, p) + scheme = self.get_local_partition_scheme(test, xml_save_path, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) - def print_cluster_partition_schemes(self, test): + def print_cluster_partition_schemes(self, test, xml_save_path): node_index = self.get_current_node_index() virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) for p in self.partitions: - scheme = self.get_cluster_partition_scheme(test, p) + scheme = self.get_cluster_partition_scheme(test, xml_save_path, p) self.print_partition_schemes(virtual_partitions, scheme, test, p, node_index) def print_partition_schemes(self, virtual_partitions, scheme, test, partitions, node_id): @@ -123,11 +94,11 @@ class WeatherBenchmark: else: print " Scheme is EMPTY." - def get_local_partition_scheme(self, test, partition): + def get_local_partition_scheme(self, test, xml_save_path, partition): scheme = [] virtual_partitions = get_local_virtual_partitions(self.partitions) data_schemes = get_partition_scheme(0, virtual_partitions, self.base_paths) - link_base_schemes = get_partition_scheme(0, partition, self.base_paths, self.DATA_LINKS_FOLDER + test) + link_base_schemes = get_partition_scheme(0, partition, self.base_paths, "data_links/" + test) # Match link paths to real data paths. group_size = len(data_schemes) / len(link_base_schemes) @@ -146,7 +117,7 @@ class WeatherBenchmark: offset += group_size return scheme - def get_cluster_partition_scheme(self, test, partition): + def get_cluster_partition_scheme(self, test, xml_save_path, partition): node_index = self.get_current_node_index() if node_index == -1: print "Unknown host." @@ -156,7 +127,7 @@ class WeatherBenchmark: local_virtual_partitions = get_local_virtual_partitions(self.partitions) virtual_partitions = get_cluster_virtual_partitions(self.nodes, self.partitions) data_schemes = get_partition_scheme(node_index, virtual_partitions, self.base_paths) - link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, self.DATA_LINKS_FOLDER + test) + link_base_schemes = get_cluster_link_scheme(len(self.nodes), partition, self.base_paths, "data_links/" + test) # Match link paths to real data paths. for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: @@ -174,7 +145,6 @@ class WeatherBenchmark: has_data = True if link_node < node_index: has_data = False - # Make links for date_node, data_disk, data_virtual, data_index, data_path in data_schemes: if has_data and data_disk == link_disk \ @@ -183,68 +153,36 @@ class WeatherBenchmark: scheme.append([link_disk, -1, link_index, "", link_path]) return scheme - def build_data_links(self, reset): + def build_data_links(self, xml_save_path): if (len(self.base_paths) == 0): return - if reset: - shutil.rmtree(self.base_paths[0] + self.DATA_LINKS_FOLDER) for test in self.dataset.get_tests(): if test in self.BENCHMARK_LOCAL_TESTS: for i in self.partitions: - scheme = self.get_local_partition_scheme(test, i) - self.build_data_links_scheme(scheme) - if 1 in self.partitions and len(self.base_paths) > 1: - scheme = self.build_data_links_local_zero_partition(test) + scheme = self.get_local_partition_scheme(test, xml_save_path, i) self.build_data_links_scheme(scheme) elif test in self.BENCHMARK_CLUSTER_TESTS: for i in self.partitions: - scheme = self.get_cluster_partition_scheme(test, i) - self.build_data_links_scheme(scheme) - if 1 in self.partitions and len(self.base_paths) > 1: - scheme = self.build_data_links_cluster_zero_partition(test) + scheme = self.get_cluster_partition_scheme(test, xml_save_path, i) self.build_data_links_scheme(scheme) else: print "Unknown test." exit() def build_data_links_scheme(self, scheme): - '''Build all the data links based on the scheme information.''' + """Build all the data links based on the scheme information.""" + link_path_cleared = [] for (data_disk, data_index, partition, data_path, link_path) in scheme: + if link_path not in link_path_cleared and os.path.isdir(link_path): + shutil.rmtree(link_path) + link_path_cleared.append(link_path) self.add_collection_links_for(data_path, link_path, data_index) - def build_data_links_cluster_zero_partition(self, test): - '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' - scheme = [] - link_base_schemes = get_cluster_link_scheme(len(self.nodes), 1, self.base_paths, self.DATA_LINKS_FOLDER + test) - for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: - new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test + "/" + str(link_node) + "nodes") - scheme.append([0, link_disk, 0, link_path, new_link_path]) - return scheme - - def build_data_links_local_zero_partition(self, test): - '''Build a scheme for all data in one symbolically linked folder. (0 partition)''' - scheme = [] - index = 0 - link_base_schemes = get_partition_scheme(0, 1, self.base_paths, self.DATA_LINKS_FOLDER + test) - for link_node, link_disk, link_virtual, link_index, link_path in link_base_schemes: - if test == "local_batch_scale_out" and index > 0: - continue - new_link_path = self.get_zero_partition_path(link_node, self.DATA_LINKS_FOLDER + test) - scheme.append([0, index, 0, link_path, new_link_path]) - index += 1 - return scheme - - def get_zero_partition_path(self, node, key): - '''Return a partition path for the zero partition.''' - base_path = self.base_paths[0] - new_link_path = get_partition_scheme(node, 1, [base_path], key)[0][PARTITION_INDEX_PATH] - return new_link_path.replace("p1", "p0") - def get_current_node_index(self): found = False node_index = 0 for machine in self.nodes: - if socket.gethostname().startswith(machine.get_node_name()): + if socket.gethostname() == machine.get_node_name(): found = True break node_index += 1 @@ -257,13 +195,10 @@ class WeatherBenchmark: def add_collection_links_for(self, real_path, link_path, index): for collection in self.QUERY_COLLECTIONS: collection_path = link_path + collection + "/" - collection_index = collection_path + "index" + str(index) if not os.path.isdir(collection_path): os.makedirs(collection_path) if index >= 0: - if os.path.islink(collection_index): - os.unlink(collection_index) - os.symlink(real_path + collection + "/", collection_index) + os.symlink(real_path + collection + "/", collection_path + "index" + str(index)) def copy_query_files(self, reset): for test in self.dataset.get_tests(): @@ -278,39 +213,25 @@ class WeatherBenchmark: def copy_cluster_query_files(self, test, reset): '''Determine the data_link path for cluster query files and copy with new location for collection.''' - if 1 in self.partitions and len(self.base_paths) > 1: - for n in range(len(self.nodes)): - query_path = get_cluster_query_path(self.base_paths, test, 0, n) - prepare_path(query_path, reset) - - # Copy query files. - new_link_path = self.get_zero_partition_path(n, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") - self.copy_and_replace_query(query_path, [new_link_path]) + partitions = self.dataset.get_partitions()[0] for n in range(len(self.nodes)): for p in self.partitions: query_path = get_cluster_query_path(self.base_paths, test, p, n) prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(n, p, self.base_paths, self.DATA_LINKS_FOLDER + test + "/" + str(n) + "nodes") + partition_paths = get_partition_paths(n, p, self.base_paths, "data_links/" + test + "/" + str(n) + "nodes") self.copy_and_replace_query(query_path, partition_paths) def copy_local_query_files(self, test, reset): '''Determine the data_link path for local query files and copy with new location for collection.''' - if 1 in self.partitions and len(self.base_paths) > 1: - query_path = get_local_query_path(self.base_paths, test, 0) - prepare_path(query_path, reset) - - # Copy query files. - new_link_path = self.get_zero_partition_path(0, self.DATA_LINKS_FOLDER + test) - self.copy_and_replace_query(query_path, [new_link_path]) for p in self.partitions: query_path = get_local_query_path(self.base_paths, test, p) prepare_path(query_path, reset) # Copy query files. - partition_paths = get_partition_paths(0, p, self.base_paths, self.DATA_LINKS_FOLDER + test) + partition_paths = get_partition_paths(0, p, self.base_paths, "data_links/" + test) self.copy_and_replace_query(query_path, partition_paths) def copy_and_replace_query(self, query_path, replacement_list): @@ -329,13 +250,6 @@ class WeatherBenchmark: for line in fileinput.input(query_path + query_file, True): sys.stdout.write(line.replace(self.QUERY_REPLACEMENT_KEY + collection, replace_string)) - # Make a search replace for partition type. - if self.dataset.get_partition_type() == "large_files": - for line in fileinput.input(query_path + query_file, True): - sys.stdout.write(line.replace("/stationCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/stationCollection")) - for line in fileinput.input(query_path + query_file, True): - sys.stdout.write(line.replace("/dataCollection", "/" + self.LARGE_FILE_ROOT_TAG + "/dataCollection")) - def get_number_of_slices(self): if len(self.dataset.get_tests()) == 0: print "No test has been defined in config file." http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py index 8ac6d17..5bfa698 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_cli.py @@ -207,18 +207,15 @@ def main(argv): print 'Processing the partition section (' + dataset.get_name() + ':d' + str(len(base_paths)) + ':s' + str(slices) + ').' data.reset() if section == "partition_scheme": - benchmark.print_partition_scheme() + benchmark.print_partition_scheme(xml_data_save_path) else: - if dataset.get_partition_type() == "large_files": - data.build_to_n_partition_files(xml_data_save_path, slices, base_paths, reset) - else: - data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) + data.copy_to_n_partitions(xml_data_save_path, slices, base_paths, reset) if section in ("all", "test_links"): # TODO determine current node print 'Processing the test links section (' + dataset.get_name() + ').' - benchmark.print_partition_scheme() - benchmark.build_data_links(reset) + benchmark.print_partition_scheme(xml_data_save_path) + benchmark.build_data_links(xml_data_save_path) if section in ("all", "queries"): print 'Processing the queries section (' + dataset.get_name() + ').' http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py index 80607b8..a6513c2 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_config.py @@ -41,10 +41,9 @@ class WeatherConfig: for node in self.config.getElementsByTagName("dataset"): name = self.get_dataset_name(node) save_paths = self.get_dataset_save_paths(node) - partition_type = self.get_dataset_partition_type(node) partitions = self.get_dataset_partitions(node) tests = self.get_dataset_tests(node) - nodes.append(Dataset(name, save_paths, partition_type, partitions, tests)) + nodes.append(Dataset(name, save_paths, partitions, tests)) return nodes @@ -70,9 +69,6 @@ class WeatherConfig: paths.append(self.get_text(item)) return paths - def get_dataset_partition_type(self, node): - return self.get_text(node.getElementsByTagName("partition_type")[0]) - def get_dataset_partitions(self, node): paths = [] for item in node.getElementsByTagName("partitions_per_path"): @@ -107,11 +103,10 @@ class Machine: return self.id + "(" + self.ip + ")" class Dataset: - def __init__(self, name, save_paths, partition_type, partitions, tests): + def __init__(self, name, save_paths, partitions, tests): self.name = name self.save_paths = save_paths self.partitions = partitions - self.partition_type = partition_type self.tests = tests def get_name(self): @@ -123,9 +118,6 @@ class Dataset: def get_partitions(self): return self.partitions - def get_partition_type(self): - return self.partition_type - def get_tests(self): return self.tests http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py index 5db090a..a4f33a1 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_convert_to_xml.py @@ -150,6 +150,10 @@ class WeatherConvertToXML: # Default return 0 + def process_one_day(self, records, report_date, page): + # Default + return 0 + def process_station_data(self, row): # Default return 0 @@ -161,7 +165,7 @@ class WeatherConvertToXML: print "Processing inventory file" file_stream = open(self.ghcnd_inventory, 'r') - csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] + csv_header = ['ID', 'SENSORS', 'SENSORS_COUNT', 'MAX_YEARS', 'TOTAL_YEARS_FOR_ALL_SENSORS'] row = file_stream.readline() csv_inventory = {} for row in file_stream: @@ -239,6 +243,16 @@ class WeatherConvertToXML: def convert_c2f(self, c): return (9 / 5 * c) + 32 + def default_xml_start(self): + return textwrap.dedent("""\ + <?xml version="1.0" encoding="ISO-8859-1"?> + <ghcnd_observation version="1.0" + xmlns:xsd="http://www.w3.org/2001/XMLSchema" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> + <credit>NOAA's National Climatic Data Center (NCDC)</credit> + <credit_URL>http://www.ncdc.noaa.gov/</credit_URL> + """) + def default_xml_web_service_start(self): field_xml = "" field_xml += "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>\n" @@ -259,6 +273,76 @@ class WeatherConvertToXML: field_xml += self.get_indent_space(indent) + "<date>" + str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) + "T00:00:00.000</date>\n" return field_xml + def get_date_from_field(self, row, field): + report_date = self.get_field_from_definition(row, field) + return str(report_date.year) + "-" + str(report_date.month).zfill(2) + "-" + str(report_date.day).zfill(2) + + def default_xml_field_date_iso8601(self, report_date): + field_xml = "" + field_xml += " <observation_date>" + self.MONTHS[report_date.month - 1] + " " + str(report_date.day) + ", " + str(report_date.year) + "</observation_date>\n" + field_xml += " <observation_date_iso8601>" + report_date.isoformat() + "</observation_date_iso8601>\n" + return field_xml + + def default_xml_field_date_year(self, year): + field_xml = "" + field_xml += " <observation_year>" + str(year) + "</observation_year>\n" + return field_xml + + def default_xml_field_date_month(self, month): + field_xml = "" + field_xml += " <observation_month>" + str(month) + "</observation_month>\n" + return field_xml + + def default_xml_field_date_day(self, day): + field_xml = "" + field_xml += " <observation_day>" + str(day) + "</observation_day>\n" + return field_xml + + def default_xml_field_station_id(self, station_id, indent=2): + field_xml = "" + field_xml += self.get_indent_space(indent) + "<station_id>" + station_id + "</station_id>\n" + return field_xml + + def default_xml_field_station(self, station_id): + station_row = "" + stations_file = open(self.ghcnd_stations, 'r') + + for line in stations_file: + if station_id == self.get_field_from_definition(line, STATIONS_FIELDS['ID']): + station_row = line + break + + field_xml = "" + field_xml += " <station_id>" + station_id + "</station_id>\n" + field_xml += " <location>\n" + field_xml += " <latitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LATITUDE']).strip() + "</latitude>\n" + field_xml += " <longitude>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['LONGITUDE']).strip() + "</longitude>\n" + + elevation = self.get_field_from_definition(station_row, STATIONS_FIELDS['ELEVATION']).strip() + if elevation != "-999.9": + field_xml += " <elevation>" + elevation + "</elevation>\n" + + field_xml += " </location>\n" + field_xml += " <name>" + self.get_field_from_definition(station_row, STATIONS_FIELDS['NAME']).strip() + "</name>\n" + + state = self.get_field_from_definition(station_row, STATIONS_FIELDS['STATE']) + if state.strip(): + field_xml += " <state>" + state + "</state>\n" + + gsn = self.get_field_from_definition(station_row, STATIONS_FIELDS['GSNFLAG']) + if gsn.strip(): + field_xml += " <gsn />\n" + + hcn = self.get_field_from_definition(station_row, STATIONS_FIELDS['HCNFLAG']) + if hcn.strip(): + field_xml += " <hcn />\n" + + wmoid = self.get_field_from_definition(station_row, STATIONS_FIELDS['WMOID']) + if wmoid.strip(): + field_xml += " <wmoid id=\"" + wmoid + "\" />\n" + + return field_xml + def default_xml_mshr_station_additional(self, station_id): """The web service station data is generate from the MSHR data supplemented with GHCN-Daily.""" station_mshr_row = "" @@ -408,6 +492,120 @@ class WeatherConvertToXML: def get_indent_space(self, indent): return (" " * (4 * indent)) +class WeatherDailyXMLFile(WeatherConvertToXML): + def process_one_month_sensor_set(self, records, page): + year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) + month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) + + station_id = self.get_dly_field(records[0], DLY_FIELD_ID) + + count = 0 + for day in range(1, 32): + try: + # TODO find out what is a valid python date range? 1889? + # Attempt to see if this is valid date. + report_date = date(year, month, day) + save_file_name = self.process_one_day(records, report_date, page) + if save_file_name is not "": + count = count + 1 + if self.debug_output: + print "Wrote file: " + save_file_name + except ValueError: + if self.debug_output: + print "Error: Not a valid date (" + str(month) + "/" + str(day) + "/" + str(year) + ") for " + station_id + "." + pass + return count + + def process_one_day(self, records, report_date, page): + station_id = self.get_dly_field(records[0], DLY_FIELD_ID) + found_data = False + + # Information for each daily file. + daily_xml_file = self.default_xml_start() + daily_xml_file += self.default_xml_field_station(station_id) + daily_xml_file += self.default_xml_field_date_iso8601(report_date) + daily_xml_file += self.default_xml_start_tag("sensors") + for record in records: + record_xml_snip = self.default_xml_day_reading_as_field(record, report_date.day) + if record_xml_snip is not "": + found_data = True + daily_xml_file += record_xml_snip + daily_xml_file += self.default_xml_end_tag("sensors") + daily_xml_file += self.default_xml_end() + + if not found_data: + return "" + + # Make sure the station folder is available. + ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" + if not os.path.isdir(ghcnd_xml_station_path): + os.makedirs(ghcnd_xml_station_path) + + # Save XML string to disk. + save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) + save_file_name = self.save_file(save_file_name, daily_xml_file) + + return save_file_name + +class WeatherMonthlyXMLFile(WeatherConvertToXML): + def process_one_month_sensor_set(self, records, page): + found_data = False + year = int(self.get_dly_field(records[0], DLY_FIELD_YEAR)) + month = int(self.get_dly_field(records[0], DLY_FIELD_MONTH)) + + station_id = self.get_dly_field(records[0], DLY_FIELD_ID) + + # Information for each daily file. + daily_xml_file = self.default_xml_start() + daily_xml_file += self.default_xml_field_station(station_id) + daily_xml_file += self.default_xml_field_date_year(year) + daily_xml_file += self.default_xml_field_date_month(month) + daily_xml_file += self.default_xml_start_tag("readings") + + for day in range(1, 32): + try: + # TODO find out what is a valid python date range? 1889? + # Attempt to see if this is valid date. + report_date = date(year, month, day) + found_daily_data = False + record_xml_snip = "" + + for record in records: + record_xml_snip += self.default_xml_day_reading_as_field(record, report_date.day) + if record_xml_snip is not "": + found_data = True + found_daily_data = True + + if found_daily_data: + daily_xml_file += self.default_xml_start_tag("reading", 2) + daily_xml_file += self.default_xml_field_date_day(day) + daily_xml_file += record_xml_snip + daily_xml_file += self.default_xml_end_tag("reading", 2) + + except ValueError: + pass + + daily_xml_file += self.default_xml_end_tag("readings") + daily_xml_file += self.default_xml_end() + + if not found_data: + return 0 + + # Make sure the station folder is available. + ghcnd_xml_station_path = self.get_base_folder(station_id) + "/" + station_id + "/" + str(report_date.year) + "/" + if not os.path.isdir(ghcnd_xml_station_path): + os.makedirs(ghcnd_xml_station_path) + + # Save XML string to disk. + save_file_name = ghcnd_xml_station_path + build_sensor_save_filename(station_id, report_date, page) + save_file_name = self.save_file(save_file_name, daily_xml_file) + + if save_file_name is not "": + if self.debug_output: + print "Wrote file: " + save_file_name + return 1 + else: + return 0 class WeatherWebServiceMonthlyXMLFile(WeatherConvertToXML): """The web service class details how to create files similar to the NOAA web service.""" http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py index 1c9f129..a7fb691 100644 --- a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_data_files.py @@ -26,8 +26,6 @@ from collections import OrderedDict # Allows partition and picking up where you left off. class WeatherDataFiles: - LARGE_FILE_ROOT_TAG = "root" - INDEX_DATA_FILE_NAME = 0 INDEX_DATA_SENSORS_STATUS = 1 INDEX_DATA_STATION_STATUS = 2 @@ -53,10 +51,14 @@ class WeatherDataFiles: self.current = self.DATA_FILE_START_INDEX self.progress_data = [] + + def get_file_list(self): + return glob.glob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) + def get_file_list_iterator(self): - """Return the list of files one at a time.""" return glob.iglob(self.base_path + "/*" + self.DATA_FILE_EXTENSION) + # Save Functions def build_progress_file(self, options, convert): if not os.path.isfile(self.progress_file_name) or 'reset' in options: @@ -96,8 +98,10 @@ class WeatherDataFiles: return # Initialize the partition paths. + partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) for path in partition_paths: + partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) @@ -141,72 +145,66 @@ class WeatherDataFiles: if current_station_partition >= len(partition_paths): current_station_partition = 0 - def build_to_n_partition_files(self, save_path, partitions, base_paths, reset): - """Once the initial data has been generated, the data can be divided into partitions - and stored in single files. - """ + + def copy_to_n_partitions_by_station(self, save_path, partitions, base_paths, reset): + """Once the initial data has been generated, the data can be copied into a set number of partitions. """ if (len(base_paths) == 0): return - XML_START = "<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>" - # Initialize the partition paths. + partition_sizes = [] partition_paths = get_partition_paths(0, partitions, base_paths) - sensors_partition_files = [] - stations_partition_files = [] for path in partition_paths: + partition_sizes.append(0) # Make sure the xml folder is available. prepare_path(path, reset) - prepare_path(path + "sensors/", False) - prepare_path(path + "stations/", False) - sensors_partition_files.append(open(path + "sensors/partition.xml", 'w')) - stations_partition_files.append(open(path + "stations/partition.xml", 'w')) - - for row in range(0, len(partition_paths)): - sensors_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") - stations_partition_files[row].write(XML_START + "<" + self.LARGE_FILE_ROOT_TAG + ">\n") - import fnmatch - import os - # copy stations and sensors into each partition - current_sensor_partition = 0 - current_station_partition = 0 - self.open_progress_data() - row_count = len(self.progress_data) - for row in range(0, row_count): - row_contents = self.progress_data[row].rsplit(self.SEPERATOR) - file_name = row_contents[self.INDEX_DATA_FILE_NAME] - station_id = os.path.basename(file_name).split('.')[0] - + current_partition = 0 + csv_sorted = self.get_csv_in_partition_order() + for item, size in csv_sorted.iteritems(): + if size < 0: + print "The progress file does not have the sensor size data saved." + return + + station_id = item.split('.')[0] + # Update partition bases on smallest current size. + current_partition = partition_sizes.index(min(partition_sizes)) + # Copy sensor files type = "sensors" file_path = build_base_save_folder(save_path, station_id, type) + station_id - for root, dirnames, filenames in os.walk(file_path): - for filename in fnmatch.filter(filenames, '*.xml'): - xml_path = os.path.join(root, filename) - xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" - sensors_partition_files[current_sensor_partition].write(xml_data) - current_sensor_partition += 1 - if current_sensor_partition >= len(sensors_partition_files): - current_sensor_partition = 0 - + new_file_path = build_base_save_folder(partition_paths[current_partition], station_id, type) + station_id + if os.path.isdir(file_path): + distutils.dir_util.copy_tree(file_path, new_file_path) + partition_sizes[current_partition] += size + # Copy station files type = "stations" file_path = build_base_save_folder(save_path, station_id, type) + station_id + ".xml" - xml_path = os.path.join(root, file_path) - xml_data = file_get_contents(xml_path).replace(XML_START, "") + "\n" - stations_partition_files[current_station_partition].write(xml_data) - current_station_partition += 1 - if current_station_partition >= len(partition_paths): - current_station_partition = 0 - - for row in range(0, len(partition_paths)): - sensors_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") - sensors_partition_files[row].close() - stations_partition_files[row].write("</" + self.LARGE_FILE_ROOT_TAG + ">\n") - stations_partition_files[row].close() - + new_file_base = build_base_save_folder(partition_paths[current_partition], station_id, type) + new_file_path = new_file_base + station_id + ".xml" + if os.path.isfile(file_path): + if not os.path.isdir(new_file_base): + os.makedirs(new_file_base) + shutil.copyfile(file_path, new_file_path) + + def get_csv_in_partition_order(self): + self.open_progress_data() + row_count = len(self.progress_data) + + # Get the dictionary of all the files and data sizes. + csv_dict = dict() + for row in range(0, row_count): + row_contents = self.progress_data[row].rsplit(self.SEPERATOR) + file_name = row_contents[self.INDEX_DATA_FILE_NAME] + folder_data = int(row_contents[self.INDEX_DATA_FOLDER_DATA]) + + csv_dict[file_name] = folder_data + + # New sorted list. + return OrderedDict(sorted(csv_dict.items(), key=lambda x: x[1], reverse=True)) + def get_file_row(self, file_name): for i in range(0, len(self.progress_data)): if self.progress_data[i].startswith(file_name): @@ -407,6 +405,3 @@ def prepare_path(path, reset): if not os.path.isdir(path): os.makedirs(path) -def file_get_contents(filename): - with open(filename) as f: - return f.read() http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml new file mode 100644 index 0000000..94c1440 --- /dev/null +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example.xml @@ -0,0 +1,34 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<data xmlns="data"> + <name>Local Example</name> + <save_path>/data</save_path> + <package>ghcnd_all</package> + <node> + <id>localhost</id> + <cluster_ip>127.0.0.1</cluster_ip> + </node> + <dataset> + <name>tiny-example</name> + <test>local_speed_up</test> + <save_path>/data</save_path> + <partitions_per_path>1</partitions_per_path> + <partitions_per_path>2</partitions_per_path> + <partitions_per_path>4</partitions_per_path> + <partitions_per_path>8</partitions_per_path> + </dataset> +</data> \ No newline at end of file http://git-wip-us.apache.org/repos/asf/vxquery/blob/94acdb09/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml ---------------------------------------------------------------------- diff --git a/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml new file mode 100644 index 0000000..6078627 --- /dev/null +++ b/vxquery-benchmark/src/main/resources/noaa-ghcn-daily/scripts/weather_example_cluster.xml @@ -0,0 +1,56 @@ +<!-- + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--> +<data xmlns="data"> + <name>Cluster Example</name> + <save_path>/data</save_path> + <package>ghcnd_all</package> + <node> + <id>machine1</id> + <cluster_ip>127.0.0.1</cluster_ip> + </node> + <node> + <id>machine2</id> + <cluster_ip>127.0.0.2</cluster_ip> + </node> + <node> + <id>machine3</id> + <cluster_ip>127.0.0.3</cluster_ip> + </node> + <node> + <id>machine4</id> + <cluster_ip>127.0.0.4</cluster_ip> + </node> + <node> + <id>machine5</id> + <cluster_ip>127.0.0.5</cluster_ip> + </node> + <dataset> + <name>tiny-1drive</name> + <test>speed_up</test> + <test>batch_scale_out</test> + <save_path>/data</save_path> + <partitions_per_path>1</partitions_per_path> + </dataset> + <dataset> + <name>small-2drives</name> + <test>speed_up</test> + <test>batch_scale_out</test> + <save_path>/data</save_path> + <save_path>/data2</save_path> + <partitions_per_path>1</partitions_per_path> + </dataset> +</data> \ No newline at end of file
