This is an automated email from the ASF dual-hosted git repository.

agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-benchmarks.git


The following commit(s) were added to refs/heads/main by this push:
     new 97ee9f7  Use int64 for keys, make columns non-nullable (#4)
97ee9f7 is described below

commit 97ee9f7ad96a1403a16c675e8941883b827bf798
Author: Andy Grove <[email protected]>
AuthorDate: Thu May 30 17:45:38 2024 -0600

    Use int64 for keys, make columns non-nullable (#4)
    
    * Use int64 for keys, make columns non-nullable
    
    * separate generate and convert commands
---
 tpch/README.md  |  7 +++--
 tpch/tpchgen.py | 92 ++++++++++++++++++++++++++++++++-------------------------
 2 files changed, 57 insertions(+), 42 deletions(-)

diff --git a/tpch/README.md b/tpch/README.md
index c1bede1..495dd2f 100644
--- a/tpch/README.md
+++ b/tpch/README.md
@@ -38,10 +38,13 @@ Data can be generated as a single Parquet file per table by 
specifying `--partit
 Data will be generated into a `data` directory in the current working 
directory.
 
 ```shell
-python tpchgen.py --scale-factor 1 --partitions 1
+python tpchgen.py generate --scale-factor 1 --partitions 1
+python tpchgen.py convert --scale-factor 1 --partitions 1
 ```
+
 Data can be generated as multiple Parquet files per table by specifying 
`--partitions` greater than one. 
 
 ```shell
-python tpchgen.py --scale-factor 1000 --partitions 64
+python tpchgen.py generate --scale-factor 1000 --partitions 64
+python tpchgen.py convert --scale-factor 1000 --partitions 64
 ```
\ No newline at end of file
diff --git a/tpch/tpchgen.py b/tpch/tpchgen.py
index b7920bf..4fe04ae 100644
--- a/tpch/tpchgen.py
+++ b/tpch/tpchgen.py
@@ -29,25 +29,25 @@ table_names = ["customer", "lineitem", "nation", "orders", 
"part", "partsupp", "
 all_schemas = {}
 
 all_schemas["customer"] = [
-    ("C_CUSTKEY", pyarrow.int32()),
+    ("C_CUSTKEY", pyarrow.int64()),
     ("C_NAME", pyarrow.string()),
     ("C_ADDRESS", pyarrow.string()),
-    ("C_NATIONKEY", pyarrow.int32()),
+    ("C_NATIONKEY", pyarrow.int64()),
     ("C_PHONE", pyarrow.string()),
-    ("C_ACCTBAL", pyarrow.decimal128(15, 2)),
+    ("C_ACCTBAL", pyarrow.decimal128(11, 2)),
     ("C_MKTSEGMENT", pyarrow.string()),
     ("C_COMMENT", pyarrow.string()),
 ]
 
 all_schemas["lineitem"] = [
-    ("L_ORDERKEY", pyarrow.int32()),
-    ("L_PARTKEY", pyarrow.int32()),
-    ("L_SUPPKEY", pyarrow.int32()),
+    ("L_ORDERKEY", pyarrow.int64()),
+    ("L_PARTKEY", pyarrow.int64()),
+    ("L_SUPPKEY", pyarrow.int64()),
     ("L_LINENUMBER", pyarrow.int32()),
-    ("L_QUANTITY", pyarrow.decimal128(15, 2)),
-    ("L_EXTENDEDPRICE", pyarrow.decimal128(15, 2)),
-    ("L_DISCOUNT", pyarrow.decimal128(15, 2)),
-    ("L_TAX", pyarrow.decimal128(15, 2)),
+    ("L_QUANTITY", pyarrow.decimal128(11, 2)),
+    ("L_EXTENDEDPRICE", pyarrow.decimal128(11, 2)),
+    ("L_DISCOUNT", pyarrow.decimal128(11, 2)),
+    ("L_TAX", pyarrow.decimal128(11, 2)),
     ("L_RETURNFLAG", pyarrow.string()),
     ("L_LINESTATUS", pyarrow.string()),
     ("L_SHIPDATE", pyarrow.date32()),
@@ -59,17 +59,17 @@ all_schemas["lineitem"] = [
 ]
 
 all_schemas["nation"] = [
-    ("N_NATIONKEY", pyarrow.int32()),
+    ("N_NATIONKEY", pyarrow.int64()),
     ("N_NAME", pyarrow.string()),
-    ("N_REGIONKEY", pyarrow.int32()),
+    ("N_REGIONKEY", pyarrow.int64()),
     ("N_COMMENT", pyarrow.string()),
 ]
 
 all_schemas["orders"] = [
-    ("O_ORDERKEY", pyarrow.int32()),
-    ("O_CUSTKEY", pyarrow.int32()),
+    ("O_ORDERKEY", pyarrow.int64()),
+    ("O_CUSTKEY", pyarrow.int64()),
     ("O_ORDERSTATUS", pyarrow.string()),
-    ("O_TOTALPRICE", pyarrow.decimal128(15, 2)),
+    ("O_TOTALPRICE", pyarrow.decimal128(11, 2)),
     ("O_ORDERDATE", pyarrow.date32()),
     ("O_ORDERPRIORITY", pyarrow.string()),
     ("O_CLERK", pyarrow.string()),
@@ -78,38 +78,38 @@ all_schemas["orders"] = [
 ]
 
 all_schemas["part"] = [
-    ("P_PARTKEY", pyarrow.int32()),
+    ("P_PARTKEY", pyarrow.int64()),
     ("P_NAME", pyarrow.string()),
     ("P_MFGR", pyarrow.string()),
     ("P_BRAND", pyarrow.string()),
     ("P_TYPE", pyarrow.string()),
     ("P_SIZE", pyarrow.int32()),
     ("P_CONTAINER", pyarrow.string()),
-    ("P_RETAILPRICE", pyarrow.decimal128(15, 2)),
+    ("P_RETAILPRICE", pyarrow.decimal128(11, 2)),
     ("P_COMMENT", pyarrow.string()),
 ]
 
 all_schemas["partsupp"] = [
-    ("PS_PARTKEY", pyarrow.int32()),
-    ("PS_SUPPKEY", pyarrow.int32()),
+    ("PS_PARTKEY", pyarrow.int64()),
+    ("PS_SUPPKEY", pyarrow.int64()),
     ("PS_AVAILQTY", pyarrow.int32()),
-    ("PS_SUPPLYCOST", pyarrow.decimal128(15, 2)),
+    ("PS_SUPPLYCOST", pyarrow.decimal128(11, 2)),
     ("PS_COMMENT", pyarrow.string()),
 ]
 
 all_schemas["region"] = [
-    ("R_REGIONKEY", pyarrow.int32()),
+    ("R_REGIONKEY", pyarrow.int64()),
     ("R_NAME", pyarrow.string()),
     ("R_COMMENT", pyarrow.string()),
 ]
 
 all_schemas["supplier"] = [
-    ("S_SUPPKEY", pyarrow.int32()),
+    ("S_SUPPKEY", pyarrow.int64()),
     ("S_NAME", pyarrow.string()),
     ("S_ADDRESS", pyarrow.string()),
-    ("S_NATIONKEY", pyarrow.int32()),
+    ("S_NATIONKEY", pyarrow.int64()),
     ("S_PHONE", pyarrow.string()),
-    ("S_ACCTBAL", pyarrow.decimal128(15, 2)),
+    ("S_ACCTBAL", pyarrow.decimal128(11, 2)),
     ("S_COMMENT", pyarrow.string()),
 ]
 
@@ -126,14 +126,14 @@ def convert_tbl_to_parquet(ctx: SessionContext, table: 
str, tbl_filename: str, f
     print(f"Converting {tbl_filename} to {parquet_filename} ...")
 
     # schema manipulation code copied from DataFusion Python tpch example
-    table_schema = [(r[0].lower(), r[1]) for r in all_schemas[table]]
+    table_schema = [pyarrow.field(r[0].lower(), r[1], nullable=False) for r in 
all_schemas[table]]
 
     # Pre-collect the output columns so we can ignore the null field we add
     # in to handle the trailing | in the file
-    output_cols = [r[0] for r in table_schema]
+    output_cols = [r.name for r in table_schema]
 
     # Trailing | requires extra field for in processing
-    table_schema.append(("some_null", pyarrow.null()))
+    table_schema.append(pyarrow.field("some_null", pyarrow.null(), 
nullable=True))
 
     schema = pyarrow.schema(table_schema)
 
@@ -146,14 +146,7 @@ def generate_tpch(scale_factor: int, partitions: int):
     if partitions == 1:
         command = f"docker run -v `pwd`/data:/data -t --rm 
ghcr.io/scalytics/tpch-docker:main -vf -s {scale_factor}"
         run_and_log_output(command, "/tmp/tpchgen.log")
-
-        # convert to parquet
-        ctx = SessionContext()
-        for table in table_names:
-            convert_tbl_to_parquet(ctx, table, f"data/{table}.tbl", "tbl", 
f"data/{table}.parquet")
-
     else:
-
         max_threads = os.cpu_count()
 
         # List of commands to run
@@ -174,8 +167,17 @@ def generate_tpch(scale_factor: int, partitions: int):
                 except Exception as e:
                     print(f"Command failed with exception: {e}")
 
+    end_time = time.time()
+    print(f"Generated CSV data in {round(end_time - start_time, 2)} seconds")
+
+def convert_tpch(scale_factor: int, partitions: int):
+    start_time = time.time()
+    ctx = SessionContext()
+    if partitions == 1:
         # convert to parquet
-        ctx = SessionContext()
+        for table in table_names:
+            convert_tbl_to_parquet(ctx, table, f"data/{table}.tbl", "tbl", 
f"data/{table}.parquet")
+    else:
         for table in table_names:
             run(f"mkdir -p data/{table}.parquet")
             if table == "nation" or table == "region":
@@ -184,13 +186,23 @@ def generate_tpch(scale_factor: int, partitions: int):
             else:
                 for part in range(1, partitions + 1):
                     convert_tbl_to_parquet(ctx, table, 
f"data/{table}.tbl.{part}", f"tbl.{part}", 
f"data/{table}.parquet/part{part}.parquet")
-
     end_time = time.time()
-    print(f"Finished in {round(end_time - start_time, 2)} seconds")
+    print(f"Converted CSV to Parquet in {round(end_time - start_time, 2)} 
seconds")
 
 if __name__ == '__main__':
     arg_parser = argparse.ArgumentParser()
-    arg_parser.add_argument('--scale-factor', type=int, help='The scale 
factor')
-    arg_parser.add_argument('--partitions', type=int, help='The number of 
partitions')
+    subparsers = arg_parser.add_subparsers(dest='command', help='Available 
commands')
+
+    parser_generate = subparsers.add_parser('generate', help='Generate TPC-H 
CSV Data')
+    parser_generate.add_argument('--scale-factor', type=int, help='The scale 
factor')
+    parser_generate.add_argument('--partitions', type=int, help='The number of 
partitions')
+
+    parser_convert = subparsers.add_parser('convert', help='Convert TPC-H CSV 
Data to Parquet')
+    parser_convert.add_argument('--scale-factor', type=int, help='The scale 
factor')
+    parser_convert.add_argument('--partitions', type=int, help='The number of 
partitions')
+
     args = arg_parser.parse_args()
-    generate_tpch(args.scale_factor, args.partitions)
\ No newline at end of file
+    if args.command == 'generate':
+        generate_tpch(args.scale_factor, args.partitions)
+    elif args.command == 'convert':
+        convert_tpch(args.scale_factor, args.partitions)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to