This is an automated email from the ASF dual-hosted git repository.

granthenke pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 13567924fcb4d0605d63c1ab96b7182755ea3ce5
Author: Grant Henke <[email protected]>
AuthorDate: Mon Dec 17 16:39:05 2018 -0600

    [scripts] Add initial test scripts for backup/restore testing
    
    Adds a rough script to test the backup and restore
    functionality. This script is rough around the edges but
    I would like to get a base script committed for collaboration purposes.
    
    Change-Id: I0c8efc3778b20687f0c1bc4c825f49f8f24e6d3b
    Reviewed-on: http://gerrit.cloudera.org:8080/12212
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <[email protected]>
---
 src/kudu/scripts/backup-perf.py | 343 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 343 insertions(+)

diff --git a/src/kudu/scripts/backup-perf.py b/src/kudu/scripts/backup-perf.py
new file mode 100755
index 0000000..430f6f4
--- /dev/null
+++ b/src/kudu/scripts/backup-perf.py
@@ -0,0 +1,343 @@
+#!/usr/bin/env python
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#################################################################
+# WARNING: This code is not stable and is intended for internal testing only.
+#
+# Backup and Restore performance test driver.
+#
+# Example invocation:
+#   backup-perf.py \
+#      --spark-submit-command spark2-submit \
+#      --kudu-spark-tools-jar kudu-spark2-tools_2.11-1.9.0-SNAPSHOT.jar
+#      --kudu-backup-jar kudu-backup2_2.11-1.9.0-SNAPSHOT.jar
+#      --num-tasks 20 \
+#      --master-addresses a123.example.com \
+#      --impalad-address a123.example.com \
+#      --backup-path hdfs:///user/foo/backups \
+#      --partitions 450 \
+#      --table-data-size-mb 500000 \
+#      test_table_1
+#################################################################
+
+import argparse
+import datetime
+import json
+import subprocess
+import sys
+import timeit
+
+from collections import OrderedDict
+
+class TickingTimer:
+  """ Timer to keep track of the period between ticks. """
+  def __init__(self):
+    self.last_tick_ = timeit.default_timer()
+
+  def tick(self):
+    """
+    Resets the tick timer and returns the duration between the last two calls
+    to tick(), or construction of this object, whichever was more recent.
+    """
+    prev_last_tick = self.last_tick_
+    self.last_tick_ = timeit.default_timer()
+    latest_tick_period = self.last_tick_ - prev_last_tick
+    return latest_tick_period
+
+  def last_tick_time(self):
+    """
+    Returns the clock time of the last tick, or construction of this object,
+    whichever was more recent.
+    """
+    return self.last_tick_
+
+def check_output(*popenargs, **kwargs):
+  r"""Run command with arguments and return its output as a byte string.
+  Backported from Python 2.7 as it's implemented as pure python on stdlib.
+  >>> check_output(['/usr/bin/python', '--version'])
+  Python 2.6.2
+  """
+  process = subprocess.Popen(stdout=subprocess.PIPE, *popenargs, **kwargs)
+  output, unused_err = process.communicate()
+  retcode = process.poll()
+  if retcode:
+    cmd = kwargs.get("args")
+    if cmd is None:
+      cmd = popenargs[0]
+    error = subprocess.CalledProcessError(retcode, cmd)
+    error.output = output
+    raise error
+  return output
+
+def parse_bool(s):
+  if s.lower() == 'true': return True
+  if s.lower() == 'false': return False
+  raise argparse.ArgumentTypeError('value must be true or false')
+
+def timestamp():
+  return datetime.datetime.now().isoformat()
+
+def run_command(opts, cmd):
+  """ Print the command and run it if not in dry-run mode. """
+  print(cmd)
+  if not opts.dryrun:
+    print(check_output(cmd, shell=True))
+
+def ensure_commands_available(opts):
+  run_command(opts, 'which impala-shell')
+  run_command(opts, 'which kudu')
+
+def get_restored_table_name(opts):
+  return opts.table_name + opts.table_restore_suffix
+
+# TODO: Change this to use the Kudu python API.
+#  It's good dog-fooding and removes the Impala requirement.
+def create_table(opts, stats):
+  """ Create a Kudu table via impala-shell """
+  print("--------------------------------------")
+  print("Creating table %s" % (opts.table_name,))
+  print("--------------------------------------")
+  print(timestamp())
+  create_table_ddl = "CREATE TABLE %s (" % (opts.table_name,)
+  num_bigint_cols = opts.columns - opts.num_string_columns
+  assert(num_bigint_cols > 0)
+  for i in range(opts.columns):
+    coltype = 'STRING'
+    if i < num_bigint_cols: coltype = 'BIGINT'
+    if i > 0: create_table_ddl += ', '
+    create_table_ddl += "f%d %s" % (i, coltype)
+    if i == 0: create_table_ddl += ' PRIMARY KEY'
+  create_table_ddl += ") PARTITION BY HASH(f0) PARTITIONS %d STORED AS KUDU " 
% \
+                      (opts.partitions, )
+  create_table_ddl += "TBLPROPERTIES ('kudu.num_tablet_replicas' = '%d')" % \
+                      (opts.replication_factor, )
+
+  cmd = 'echo "%s" | impala-shell -i %s -f -' % (create_table_ddl, 
opts.impalad_address)
+  run_command(opts, cmd)
+
+def drop_created_table(opts, stats):
+  """ Drop the created Kudu table via impala-shell """
+  print("--------------------------------------")
+  print("Dropping created table %s" % (opts.table_name, ))
+  print("--------------------------------------")
+  print(timestamp())
+  sql = "DROP TABLE %s" % (opts.table_name, )
+  cmd = 'echo "%s" | impala-shell -i %s -f -' % (sql, opts.impalad_address)
+  run_command(opts, cmd)
+
+def drop_restored_table(opts, stats):
+  """ Drop the restored Kudu table via the kudu table delete command """
+  # TODO: This may no longer be needed if and when we integrate
+  # restoring HMS metadata and the table is restored as "Impala-managed".
+  print("--------------------------------------")
+  print("Dropping restored table %s" % (get_restored_table_name(opts), ))
+  print("--------------------------------------")
+  print(timestamp())
+  cmd = 'kudu table delete %s %s' % (opts.master_addresses, opts.table_prefix +
+                                     get_restored_table_name(opts))
+  run_command(opts, cmd)
+
+def load_table(opts, stats):
+  """ Load a table with data using the DistributedDataGenerator spark job """
+  print("--------------------------------------")
+  print("Loading table %s" % (opts.table_name,))
+  print("--------------------------------------")
+  print(timestamp())
+  # Example invocation:
+  #   spark-submit --class 
org.apache.kudu.spark.tools.DistributedDataGenerator \
+  #     kudu-spark2-tools_2.11-1.8.0-SNAPSHOT.jar \
+  #     --type random \
+  #     --num-rows 10000000 \
+  #     --num-tasks 20 \
+  #     impala::default.foo_test3 m123.example.com
+  CLASS_NAME = 'org.apache.kudu.spark.tools.DistributedDataGenerator'
+  # TODO: Non-string columns are assumed to be 8 bytes.
+  row_size_bytes = opts.num_string_columns * opts.string_field_len + \
+                   (opts.columns - opts.num_string_columns) * 8
+  num_rows = opts.table_data_size_mb * 1024 * 1024 / row_size_bytes
+  print("INFO: Inserting %d rows of %d bytes each" % (num_rows, 
row_size_bytes))
+  stats['row_size_bytes'] = row_size_bytes
+  stats['num_rows'] = num_rows
+  cmd = "%s --class %s %s --type %s --num-rows %d --num-tasks %d %s %s" % \
+    (opts.spark_submit_command, CLASS_NAME, opts.kudu_spark_tools_jar,
+     opts.load_policy, num_rows, opts.load_num_tasks, opts.table_prefix + 
opts.table_name,
+     opts.master_addresses)
+  run_command(opts, cmd)
+
+def backup_table(opts, stats):
+  print("--------------------------------------")
+  print("Backing up table %s" % (opts.table_name,))
+  print("--------------------------------------")
+  print(timestamp())
+  CLASS_NAME = "org.apache.kudu.backup.KuduBackup"
+  cmd = "%s --class %s %s --kuduMasterAddresses %s --scanRequestTimeoutMs %d 
--path %s %s" % \
+    (opts.spark_submit_command, CLASS_NAME, opts.kudu_backup_jar,
+     opts.master_addresses, opts.scan_request_timeout_ms,
+     opts.backup_path, opts.table_prefix + opts.table_name)
+  run_command(opts, cmd)
+
+def restore_table(opts, stats):
+  print("--------------------------------------")
+  print("Restoring table %s as %s" % (opts.table_name, 
get_restored_table_name(opts)))
+  print("--------------------------------------")
+  print(timestamp())
+  CLASS_NAME = "org.apache.kudu.backup.KuduRestore"
+  cmd = "%s --class %s %s --tableSuffix %s --kuduMasterAddresses %s --path %s 
%s" % \
+    (opts.spark_submit_command, CLASS_NAME, opts.kudu_backup_jar,
+     opts.table_restore_suffix, opts.master_addresses, opts.backup_path, 
opts.table_prefix + opts.table_name)
+  run_command(opts, cmd)
+
+def parse_args():
+  """ Parse command-line arguments """
+  parser = argparse.ArgumentParser(description='Run a Kudu backup and restore 
performance test',
+                                   
formatter_class=argparse.ArgumentDefaultsHelpFormatter)
+
+  # Kudu Configuration
+  parser.add_argument('--master-addresses', required=True, help='The Kudu 
master addresses')
+
+  # Impala Configuration
+  parser.add_argument('--impalad-address', required=True, help='The Impala 
daemon address')
+  parser.add_argument('--table-prefix', default='impala::default.',
+                      help='Kudu table name prefix in the Hive metastore')
+
+  # Spark Job Configuration
+  parser.add_argument('--spark-submit-command', default='spark-submit',
+                      help='The name of the spark-submit binary')
+  ## Spark Loader Configuration
+  parser.add_argument('--kudu-spark-tools-jar', 
default='kudu-spark*-tools*.jar',
+                      help='The path to the kudu-spark-tools jar (for 
--load-table)')
+  parser.add_argument('--load-num-tasks', type=int, default=20,
+                      help='Number of Spark tasks to create when loading data')
+  parser.add_argument('--load-policy', default='sequential', 
choices=['sequential', 'random'],
+                      help='The data loading policy for the data generator')
+  parser.add_argument('--string-field-len', type=int, default=128,
+                      help='The length, in bytes, of generated string column 
values')
+
+  ## Spark Backup/Restore Job Configuration
+  parser.add_argument('--kudu-backup-jar', default='kudu-backup*.jar',
+                      help='The path to the kudu-backup jar')
+  parser.add_argument('--backup-path', default='hdfs:///kudu-backup-tests',
+                      help='The Hadoop-compatible path at which to store the 
backup')
+  parser.add_argument('--backup-file-format', default='parquet',
+                      help='The file format of the backup: must be parquet')
+  parser.add_argument('--scan-request-timeout-ms', type=int, default=30000,
+                      help='The default scan timeout for backup, in 
milliseconds')
+  parser.add_argument('--table-restore-suffix', default='-restore',
+                      help='Kudu table name suffix to append on restore')
+
+  # Table Configuration
+  parser.add_argument('--columns', type=int, default=10,
+                      help='The number of columns in the Kudu table')
+  parser.add_argument('--num-string-columns', type=int, default=9,
+                      help='The number of string columns in the table; the 
rest will be bigints')
+  parser.add_argument('--partitions', type=int, default=10,
+                      help='The number of hash partitions of the table. '
+                           'This script only supports hash partitions')
+  parser.add_argument('--table-data-size-mb', type=int, default=1024,
+                      help='The uncompressed data size of the table, in MB')
+  parser.add_argument('--replication-factor', type=int, default=3,
+                      help='The replication factor of the table')
+
+  # Positional
+  parser.add_argument('table_name', help='The name of the Kudu table to 
create/backup')
+
+  # Actions
+  parser.add_argument('--create-table', type=parse_bool, choices=[True, 
False], default=False,
+                      help='Whether to create the table for loading')
+  parser.add_argument('--drop-created-table', type=parse_bool, choices=[True, 
False], default=False,
+                      help='Whether to drop the created table after a 
successful test run')
+  parser.add_argument('--load-table', type=parse_bool, choices=[True, False], 
default=False,
+                      help='Whether to load the table with data')
+  parser.add_argument('--backup-table', type=parse_bool, choices=[True, 
False], default=False,
+                      help='Whether to back up the table')
+  parser.add_argument('--restore-table', type=parse_bool, choices=[True, 
False], default=False,
+                      help='Whether to restore the table')
+  parser.add_argument('--drop-restored-table', type=parse_bool, choices=[True, 
False], default=False,
+                      help='Whether to drop the restored table after a 
successful test run')
+
+  # Utility
+  parser.add_argument('--dryrun', action='store_true',
+                      help='Do not execute any commands, only print what would 
be executed')
+
+  return parser.parse_args()
+
+def main():
+  start_timestamp = timestamp()
+  print(start_timestamp)
+  print("Starting perf test...")
+
+  opts = parse_args()
+
+  stats = OrderedDict()
+  stats['start_timestamp'] = start_timestamp
+
+  stats['columns'] = opts.columns
+  stats['num_string_columns'] = opts.num_string_columns
+  stats['partitions'] = opts.partitions
+  stats['table_data_size_mb'] = opts.table_data_size_mb
+  stats['replication_factor'] = opts.replication_factor
+  stats['load_num_tasks'] = opts.load_num_tasks
+  stats['load_policy'] = opts.load_policy
+  stats['string_field_len'] = opts.load_policy
+
+  timer = TickingTimer()
+  start = timer.last_tick_time()
+
+  if opts.create_table:
+    create_table(opts, stats)
+    stats['create_table_duration_sec'] = timer.tick()
+
+  if opts.load_table:
+    load_table(opts, stats)
+    stats['load_table_duration_sec'] = timer.tick()
+
+  if opts.backup_table:
+    backup_table(opts, stats)
+    stats['backup_table_duration_sec'] = timer.tick()
+
+  if opts.restore_table:
+    restore_table(opts, stats)
+    stats['restore_table_duration_sec'] = timer.tick()
+
+  if opts.drop_created_table:
+    drop_created_table(opts, stats)
+    stats['drop_created_table_duration_sec'] = timer.tick()
+
+  if opts.drop_restored_table:
+    drop_restored_table(opts, stats)
+    stats['drop_restored_table_duration_sec'] = timer.tick()
+
+  end = timer.last_tick_time()
+
+  stats['end_timestamp'] = timestamp()
+  print(stats['end_timestamp'])
+  print("Ending perf test")
+  total_duration = end - start
+  stats['total_duration_sec'] = total_duration
+  print("Total time elapsed: %s s" % (total_duration, ))
+
+  print("")
+  print("--------------------------------------")
+  print("[ BEGIN STATS ]")
+  print(json.dumps(stats,
+                   indent=4, separators=(',', ': ')))
+  print("[ END STATS ]")
+  print("--------------------------------------")
+
+if __name__ == "__main__":
+  main()

Reply via email to