This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-benchmarks.git
The following commit(s) were added to refs/heads/main by this push:
new 97ee9f7 Use int64 for keys, make columns non-nullable (#4)
97ee9f7 is described below
commit 97ee9f7ad96a1403a16c675e8941883b827bf798
Author: Andy Grove <[email protected]>
AuthorDate: Thu May 30 17:45:38 2024 -0600
Use int64 for keys, make columns non-nullable (#4)
* Use int64 for keys, make columns non-nullable
* separate generate and convert commands
---
tpch/README.md | 7 +++--
tpch/tpchgen.py | 92 ++++++++++++++++++++++++++++++++-------------------------
2 files changed, 57 insertions(+), 42 deletions(-)
diff --git a/tpch/README.md b/tpch/README.md
index c1bede1..495dd2f 100644
--- a/tpch/README.md
+++ b/tpch/README.md
@@ -38,10 +38,13 @@ Data can be generated as a single Parquet file per table by
specifying `--partit
Data will be generated into a `data` directory in the current working
directory.
```shell
-python tpchgen.py --scale-factor 1 --partitions 1
+python tpchgen.py generate --scale-factor 1 --partitions 1
+python tpchgen.py convert --scale-factor 1 --partitions 1
```
+
Data can be generated as multiple Parquet files per table by specifying
`--partitions` greater than one.
```shell
-python tpchgen.py --scale-factor 1000 --partitions 64
+python tpchgen.py generate --scale-factor 1000 --partitions 64
+python tpchgen.py convert --scale-factor 1000 --partitions 64
```
\ No newline at end of file
diff --git a/tpch/tpchgen.py b/tpch/tpchgen.py
index b7920bf..4fe04ae 100644
--- a/tpch/tpchgen.py
+++ b/tpch/tpchgen.py
@@ -29,25 +29,25 @@ table_names = ["customer", "lineitem", "nation", "orders",
"part", "partsupp", "
all_schemas = {}
all_schemas["customer"] = [
- ("C_CUSTKEY", pyarrow.int32()),
+ ("C_CUSTKEY", pyarrow.int64()),
("C_NAME", pyarrow.string()),
("C_ADDRESS", pyarrow.string()),
- ("C_NATIONKEY", pyarrow.int32()),
+ ("C_NATIONKEY", pyarrow.int64()),
("C_PHONE", pyarrow.string()),
- ("C_ACCTBAL", pyarrow.decimal128(15, 2)),
+ ("C_ACCTBAL", pyarrow.decimal128(11, 2)),
("C_MKTSEGMENT", pyarrow.string()),
("C_COMMENT", pyarrow.string()),
]
all_schemas["lineitem"] = [
- ("L_ORDERKEY", pyarrow.int32()),
- ("L_PARTKEY", pyarrow.int32()),
- ("L_SUPPKEY", pyarrow.int32()),
+ ("L_ORDERKEY", pyarrow.int64()),
+ ("L_PARTKEY", pyarrow.int64()),
+ ("L_SUPPKEY", pyarrow.int64()),
("L_LINENUMBER", pyarrow.int32()),
- ("L_QUANTITY", pyarrow.decimal128(15, 2)),
- ("L_EXTENDEDPRICE", pyarrow.decimal128(15, 2)),
- ("L_DISCOUNT", pyarrow.decimal128(15, 2)),
- ("L_TAX", pyarrow.decimal128(15, 2)),
+ ("L_QUANTITY", pyarrow.decimal128(11, 2)),
+ ("L_EXTENDEDPRICE", pyarrow.decimal128(11, 2)),
+ ("L_DISCOUNT", pyarrow.decimal128(11, 2)),
+ ("L_TAX", pyarrow.decimal128(11, 2)),
("L_RETURNFLAG", pyarrow.string()),
("L_LINESTATUS", pyarrow.string()),
("L_SHIPDATE", pyarrow.date32()),
@@ -59,17 +59,17 @@ all_schemas["lineitem"] = [
]
all_schemas["nation"] = [
- ("N_NATIONKEY", pyarrow.int32()),
+ ("N_NATIONKEY", pyarrow.int64()),
("N_NAME", pyarrow.string()),
- ("N_REGIONKEY", pyarrow.int32()),
+ ("N_REGIONKEY", pyarrow.int64()),
("N_COMMENT", pyarrow.string()),
]
all_schemas["orders"] = [
- ("O_ORDERKEY", pyarrow.int32()),
- ("O_CUSTKEY", pyarrow.int32()),
+ ("O_ORDERKEY", pyarrow.int64()),
+ ("O_CUSTKEY", pyarrow.int64()),
("O_ORDERSTATUS", pyarrow.string()),
- ("O_TOTALPRICE", pyarrow.decimal128(15, 2)),
+ ("O_TOTALPRICE", pyarrow.decimal128(11, 2)),
("O_ORDERDATE", pyarrow.date32()),
("O_ORDERPRIORITY", pyarrow.string()),
("O_CLERK", pyarrow.string()),
@@ -78,38 +78,38 @@ all_schemas["orders"] = [
]
all_schemas["part"] = [
- ("P_PARTKEY", pyarrow.int32()),
+ ("P_PARTKEY", pyarrow.int64()),
("P_NAME", pyarrow.string()),
("P_MFGR", pyarrow.string()),
("P_BRAND", pyarrow.string()),
("P_TYPE", pyarrow.string()),
("P_SIZE", pyarrow.int32()),
("P_CONTAINER", pyarrow.string()),
- ("P_RETAILPRICE", pyarrow.decimal128(15, 2)),
+ ("P_RETAILPRICE", pyarrow.decimal128(11, 2)),
("P_COMMENT", pyarrow.string()),
]
all_schemas["partsupp"] = [
- ("PS_PARTKEY", pyarrow.int32()),
- ("PS_SUPPKEY", pyarrow.int32()),
+ ("PS_PARTKEY", pyarrow.int64()),
+ ("PS_SUPPKEY", pyarrow.int64()),
("PS_AVAILQTY", pyarrow.int32()),
- ("PS_SUPPLYCOST", pyarrow.decimal128(15, 2)),
+ ("PS_SUPPLYCOST", pyarrow.decimal128(11, 2)),
("PS_COMMENT", pyarrow.string()),
]
all_schemas["region"] = [
- ("R_REGIONKEY", pyarrow.int32()),
+ ("R_REGIONKEY", pyarrow.int64()),
("R_NAME", pyarrow.string()),
("R_COMMENT", pyarrow.string()),
]
all_schemas["supplier"] = [
- ("S_SUPPKEY", pyarrow.int32()),
+ ("S_SUPPKEY", pyarrow.int64()),
("S_NAME", pyarrow.string()),
("S_ADDRESS", pyarrow.string()),
- ("S_NATIONKEY", pyarrow.int32()),
+ ("S_NATIONKEY", pyarrow.int64()),
("S_PHONE", pyarrow.string()),
- ("S_ACCTBAL", pyarrow.decimal128(15, 2)),
+ ("S_ACCTBAL", pyarrow.decimal128(11, 2)),
("S_COMMENT", pyarrow.string()),
]
@@ -126,14 +126,14 @@ def convert_tbl_to_parquet(ctx: SessionContext, table:
str, tbl_filename: str, f
print(f"Converting {tbl_filename} to {parquet_filename} ...")
# schema manipulation code copied from DataFusion Python tpch example
- table_schema = [(r[0].lower(), r[1]) for r in all_schemas[table]]
+ table_schema = [pyarrow.field(r[0].lower(), r[1], nullable=False) for r in
all_schemas[table]]
# Pre-collect the output columns so we can ignore the null field we add
# in to handle the trailing | in the file
- output_cols = [r[0] for r in table_schema]
+ output_cols = [r.name for r in table_schema]
# Trailing | requires extra field for in processing
- table_schema.append(("some_null", pyarrow.null()))
+ table_schema.append(pyarrow.field("some_null", pyarrow.null(),
nullable=True))
schema = pyarrow.schema(table_schema)
@@ -146,14 +146,7 @@ def generate_tpch(scale_factor: int, partitions: int):
if partitions == 1:
command = f"docker run -v `pwd`/data:/data -t --rm
ghcr.io/scalytics/tpch-docker:main -vf -s {scale_factor}"
run_and_log_output(command, "/tmp/tpchgen.log")
-
- # convert to parquet
- ctx = SessionContext()
- for table in table_names:
- convert_tbl_to_parquet(ctx, table, f"data/{table}.tbl", "tbl",
f"data/{table}.parquet")
-
else:
-
max_threads = os.cpu_count()
# List of commands to run
@@ -174,8 +167,17 @@ def generate_tpch(scale_factor: int, partitions: int):
except Exception as e:
print(f"Command failed with exception: {e}")
+ end_time = time.time()
+ print(f"Generated CSV data in {round(end_time - start_time, 2)} seconds")
+
+def convert_tpch(scale_factor: int, partitions: int):
+ start_time = time.time()
+ ctx = SessionContext()
+ if partitions == 1:
# convert to parquet
- ctx = SessionContext()
+ for table in table_names:
+ convert_tbl_to_parquet(ctx, table, f"data/{table}.tbl", "tbl",
f"data/{table}.parquet")
+ else:
for table in table_names:
run(f"mkdir -p data/{table}.parquet")
if table == "nation" or table == "region":
@@ -184,13 +186,23 @@ def generate_tpch(scale_factor: int, partitions: int):
else:
for part in range(1, partitions + 1):
convert_tbl_to_parquet(ctx, table,
f"data/{table}.tbl.{part}", f"tbl.{part}",
f"data/{table}.parquet/part{part}.parquet")
-
end_time = time.time()
- print(f"Finished in {round(end_time - start_time, 2)} seconds")
+ print(f"Converted CSV to Parquet in {round(end_time - start_time, 2)}
seconds")
if __name__ == '__main__':
arg_parser = argparse.ArgumentParser()
- arg_parser.add_argument('--scale-factor', type=int, help='The scale
factor')
- arg_parser.add_argument('--partitions', type=int, help='The number of
partitions')
+ subparsers = arg_parser.add_subparsers(dest='command', help='Available
commands')
+
+ parser_generate = subparsers.add_parser('generate', help='Generate TPC-H
CSV Data')
+ parser_generate.add_argument('--scale-factor', type=int, help='The scale
factor')
+ parser_generate.add_argument('--partitions', type=int, help='The number of
partitions')
+
+ parser_convert = subparsers.add_parser('convert', help='Convert TPC-H CSV
Data to Parquet')
+ parser_convert.add_argument('--scale-factor', type=int, help='The scale
factor')
+ parser_convert.add_argument('--partitions', type=int, help='The number of
partitions')
+
args = arg_parser.parse_args()
- generate_tpch(args.scale_factor, args.partitions)
\ No newline at end of file
+ if args.command == 'generate':
+ generate_tpch(args.scale_factor, args.partitions)
+ elif args.command == 'convert':
+ convert_tpch(args.scale_factor, args.partitions)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]