Copilot commented on code in PR #19042:
URL: https://github.com/apache/datafusion/pull/19042#discussion_r2587988139


##########
benchmarks/src/clickbench.rs:
##########
@@ -78,6 +78,19 @@ pub struct RunOpt {
     /// If present, write results json here
     #[structopt(parse(from_os_str), short = "o", long = "output")]
     output_path: Option<PathBuf>,
+
+    /// Column name that the data is sorted by (e.g., "EventTime")
+    /// If specified, DataFusion will be informed that the data has this sort 
order
+    /// using CREATE EXTERNAL TABLE with WITH ORDER clause
+    ///
+    /// This is for `sorted_clickbench` benchmark's internal usage. If you 
want to specify

Review Comment:
   The comment mentions "sorted_clickbench" but the shell function is actually 
named "data_sorted_clickbench". This inconsistency could confuse users. 
Consider updating the comment to match the actual function name.
   ```suggestion
       /// This is for `data_sorted_clickbench` benchmark's internal usage. If 
you want to specify
   ```



##########
benchmarks/src/clickbench.rs:
##########
@@ -78,6 +78,19 @@ pub struct RunOpt {
     /// If present, write results json here
     #[structopt(parse(from_os_str), short = "o", long = "output")]
     output_path: Option<PathBuf>,
+
+    /// Column name that the data is sorted by (e.g., "EventTime")
+    /// If specified, DataFusion will be informed that the data has this sort 
order
+    /// using CREATE EXTERNAL TABLE with WITH ORDER clause
+    ///
+    /// This is for `sorted_clickbench` benchmark's internal usage. If you 
want to specify
+    /// a different order, make sure the dataset is generated with the same 
sort order.
+    #[structopt(long = "sorted-by")]
+    sorted_by: Option<String>,
+
+    /// Sort order: ASC or DESC (default: ASC)
+    #[structopt(long = "sort-order", default_value = "ASC")]
+    sort_order: String,

Review Comment:
   The `sort_order` parameter should be validated to ensure it's either "ASC" 
or "DESC". Currently, any value can be passed which could lead to invalid SQL 
generation. Consider adding validation or using an enum type in structopt.



##########
benchmarks/src/clickbench.rs:
##########
@@ -214,17 +253,54 @@ impl RunOpt {
     }
 
     /// Registers the `hits.parquet` as a table named `hits`
+    /// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
     async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
-        let options = Default::default();
         let path = self.path.as_os_str().to_str().unwrap();
-        ctx.register_parquet("hits", path, options)
-            .await
-            .map_err(|e| {
-                DataFusionError::Context(
-                    format!("Registering 'hits' as {path}"),
-                    Box::new(e),
-                )
-            })
+
+        // If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
+        if let Some(ref sort_column) = self.sorted_by {
+            println!(
+                "Registering table with sort order: {} {}",
+                sort_column, self.sort_order
+            );
+
+            // Escape column name with double quotes
+            let escaped_column = if sort_column.contains('"') {
+                sort_column.clone()
+            } else {
+                format!("\"{sort_column}\"")
+            };

Review Comment:
   The column name escaping logic is incomplete. If a column name already 
contains double quotes, simply cloning it won't properly escape them. Column 
names with quotes should have them doubled (e.g., `My"Column` should become 
`"My""Column"`). Consider using proper SQL identifier escaping.
   ```suggestion
               // Escape column name for SQL identifier: double embedded quotes 
and wrap in double quotes
               let escaped_column = format!("\"{}\"", sort_column.replace('"', 
"\"\""));
   ```



##########
benchmarks/src/clickbench.rs:
##########
@@ -214,17 +253,54 @@ impl RunOpt {
     }
 
     /// Registers the `hits.parquet` as a table named `hits`
+    /// If sorted_by is specified, uses CREATE EXTERNAL TABLE with WITH ORDER
     async fn register_hits(&self, ctx: &SessionContext) -> Result<()> {
-        let options = Default::default();
         let path = self.path.as_os_str().to_str().unwrap();
-        ctx.register_parquet("hits", path, options)
-            .await
-            .map_err(|e| {
-                DataFusionError::Context(
-                    format!("Registering 'hits' as {path}"),
-                    Box::new(e),
-                )
-            })
+
+        // If sorted_by is specified, use CREATE EXTERNAL TABLE with WITH ORDER
+        if let Some(ref sort_column) = self.sorted_by {
+            println!(
+                "Registering table with sort order: {} {}",
+                sort_column, self.sort_order
+            );
+
+            // Escape column name with double quotes
+            let escaped_column = if sort_column.contains('"') {
+                sort_column.clone()
+            } else {
+                format!("\"{sort_column}\"")
+            };
+
+            // Build CREATE EXTERNAL TABLE DDL with WITH ORDER clause
+            // Schema will be automatically inferred from the Parquet file
+            let create_table_sql = format!(
+                "CREATE EXTERNAL TABLE hits \
+                 STORED AS PARQUET \
+                 LOCATION '{}' \
+                 WITH ORDER ({} {})",
+                path,
+                escaped_column,
+                self.sort_order.to_uppercase()

Review Comment:
   The `to_uppercase()` call is applied to the user-provided `sort_order` 
without validation. If an invalid value is provided (e.g., "invalid"), this 
will generate invalid SQL. This validation should happen earlier, ideally when 
parsing command-line arguments.



##########
benchmarks/sort_clickbench.py:
##########
@@ -0,0 +1,254 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Sort ClickBench data by EventTime for reverse scan benchmark.
+Enhanced version with configurable row group size and optimization options.
+"""
+
+import sys
+import argparse
+from pathlib import Path
+
+try:
+    import pyarrow.parquet as pq
+    import pyarrow.compute as pc
+except ImportError:
+    print("Error: pyarrow is not installed.")
+    print("Please install it with: pip install pyarrow")
+    sys.exit(1)
+
+
+def sort_clickbench_data(
+        input_path: str,
+        output_path: str,
+        row_group_size: int = 64 * 1024,  # 64k rows default
+        compression: str = 'zstd',
+        compression_level: int = 3,
+        verify: bool = True
+):
+    """Sort parquet file by EventTime column with optimized settings."""
+
+    input_file = Path(input_path)
+    output_file = Path(output_path)
+
+    if not input_file.exists():
+        print(f"Error: Input file not found: {input_file}")
+        sys.exit(1)
+
+    if output_file.exists():
+        print(f"Sorted file already exists: {output_file}")
+        if verify:
+            verify_sorted_file(output_file)
+        return
+
+    try:
+        print(f"Reading {input_file.name}...")
+        table = pq.read_table(str(input_file))
+
+        print(f"Original table has {len(table):,} rows")
+        print("Sorting by EventTime...")
+
+        # Sort the table by EventTime
+        sorted_indices = pc.sort_indices(table, sort_keys=[("EventTime", 
"ascending")])
+        sorted_table = pc.take(table, sorted_indices)
+
+        print(f"Sorted table has {len(sorted_table):,} rows")
+
+        # Verify sort
+        event_times = sorted_table.column('EventTime').to_pylist()
+        if event_times and verify:
+            print(f"First EventTime: {event_times[0]}")
+            print(f"Last EventTime: {event_times[-1]}")
+            # Verify ascending order
+            is_sorted = all(event_times[i] <= event_times[i+1] for i in 
range(min(1000, len(event_times)-1)))
+            print(f"Sort verification (first 1000 rows): {'✓ PASS' if 
is_sorted else '✗ FAIL'}")
+
+        print(f"Writing sorted file to {output_file}...")
+        print(f"  Row group size: {row_group_size:,} rows")
+        print(f"  Compression: {compression}")
+
+        # Write sorted table with optimized settings
+        pq.write_table(
+            sorted_table,
+            str(output_file),
+            compression=compression,
+            use_dictionary=True,
+            write_statistics=True,
+            # Optimize row group size for better performance
+            row_group_size=row_group_size,
+            # Set data page size (1MB is good for most cases)
+            data_page_size=1024 * 1024,
+            # Use v2 data page format for better compression
+            use_deprecated_int96_timestamps=False,
+            coerce_timestamps='us',  # Use microsecond precision
+            # Batch size for writing
+            write_batch_size=min(row_group_size, 1024 * 64),
+            # Enable compression for all columns
+            compression_level=compression_level,  # Use default compression 
level
+        )
+
+        # Report results
+        input_size_mb = input_file.stat().st_size / (1024**2)
+        output_size_mb = output_file.stat().st_size / (1024**2)
+
+        # Read metadata to verify row groups
+        parquet_file = pq.ParquetFile(str(output_file))
+        num_row_groups = parquet_file.num_row_groups
+
+        print("\n✓ Successfully created sorted file!")
+        print(f"  Input:  {input_size_mb:.1f} MB")
+        print(f"  Output: {output_size_mb:.1f} MB")
+        print(f"  Compression ratio: {input_size_mb/output_size_mb:.2f}x")

Review Comment:
   Potential division by zero: if `output_size_mb` is 0 (empty or very small 
file), this will raise a `ZeroDivisionError`. Add a check before the division 
or handle the case where the output file is empty.
   ```suggestion
           if output_size_mb > 0:
               print(f"  Compression ratio: 
{input_size_mb/output_size_mb:.2f}x")
           else:
               print("  Compression ratio: N/A (output file size is zero)")
   ```



##########
benchmarks/bench.sh:
##########
@@ -1197,6 +1206,80 @@ compare_benchmarks() {
 
 }
 
+# Creates sorted ClickBench data from hits_0.parquet (partitioned dataset)
+# The data is sorted by EventTime in ascending order
+# Using hits_0.parquet (~150MB) instead of full hits.parquet (~14GB) for 
faster testing
+data_sorted_clickbench() {
+    SORTED_FILE="${DATA_DIR}/hits_0_sorted.parquet"
+    ORIGINAL_FILE="${DATA_DIR}/hits_partitioned/hits_0.parquet"
+
+    echo "Creating sorted ClickBench dataset from hits_0.parquet..."
+
+    # Check if partitioned data exists
+    if [ ! -f "${ORIGINAL_FILE}" ]; then
+        echo "hits_partitioned/hits_0.parquet not found. Running 
data_clickbench_partitioned first..."
+        data_clickbench_partitioned
+    fi
+
+    # Check if sorted file already exists
+    if [ -f "${SORTED_FILE}" ]; then
+        echo "Sorted hits_0.parquet already exists at ${SORTED_FILE}"
+        return 0
+    fi
+
+    echo "Sorting hits_0.parquet by EventTime (this takes ~10 seconds)..."
+
+    # Ensure virtual environment exists and has pyarrow
+    if [ ! -d "$VIRTUAL_ENV" ]; then
+        echo "Creating virtual environment at $VIRTUAL_ENV..."
+        python3 -m venv "$VIRTUAL_ENV"
+    fi
+
+    # Activate virtual environment
+    source "$VIRTUAL_ENV/bin/activate"
+
+    # Check and install pyarrow if needed
+    if ! python3 -c "import pyarrow" 2>/dev/null; then
+        echo "Installing pyarrow (this may take a minute)..."
+        pip install --quiet pyarrow
+    fi
+
+    # Use the standalone Python script to sort
+    python3 "${SCRIPT_DIR}"/sort_clickbench.py "${ORIGINAL_FILE}" 
"${SORTED_FILE}"
+    local result=$?
+
+    # Deactivate virtual environment
+    deactivate

Review Comment:
   The `deactivate` command may fail or not exist if the virtual environment 
activation failed. This should be wrapped in a check to prevent script errors. 
Consider using `if command -v deactivate &> /dev/null; then deactivate; fi` or 
checking if the activation was successful before deactivating.
   ```suggestion
       if command -v deactivate &> /dev/null; then deactivate; fi
   ```



##########
benchmarks/bench.sh:
##########
@@ -99,6 +99,9 @@ clickbench_partitioned: ClickBench queries against 
partitioned (100 files) parqu
 clickbench_pushdown:    ClickBench queries against partitioned (100 files) 
parquet w/ filter_pushdown enabled
 clickbench_extended:    ClickBench \"inspired\" queries against a single 
parquet (DataFusion specific)
 
+# Sorted Data Benchmarks (ORDER BY Optimization)
+data_sorted_clickbench:     ClickBench queries on pre-sorted data WITH sort 
order info (tests sort elimination optimization)
+

Review Comment:
   The new `data_sorted_clickbench` benchmark is not documented in the 
README.md file. Users won't know how to use this new feature without 
documentation. Consider adding a section describing how to generate sorted data 
and run the sorted data benchmark, similar to how other benchmarks are 
documented.
   ```suggestion
   
   # Documentation:
   # To use the data_sorted_clickbench benchmark:
   # 1. Generate sorted ClickBench data with sort order information:
   #    Run: ./bench.sh generate_sorted_clickbench_data
   #    This will create the required sorted parquet files in the data 
directory.
   # 2. Run the benchmark:
   #    Run: ./bench.sh data_sorted_clickbench
   #    This will execute the ClickBench queries on the pre-sorted data and 
test sort elimination optimization.
   # For more details, see the README.md file.
   ```



##########
benchmarks/sort_clickbench.py:
##########
@@ -0,0 +1,254 @@
+#!/usr/bin/env python3
+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Sort ClickBench data by EventTime for reverse scan benchmark.
+Enhanced version with configurable row group size and optimization options.
+"""
+
+import sys
+import argparse
+from pathlib import Path
+
+try:
+    import pyarrow.parquet as pq
+    import pyarrow.compute as pc
+except ImportError:
+    print("Error: pyarrow is not installed.")
+    print("Please install it with: pip install pyarrow")
+    sys.exit(1)
+
+
+def sort_clickbench_data(
+        input_path: str,
+        output_path: str,
+        row_group_size: int = 64 * 1024,  # 64k rows default
+        compression: str = 'zstd',
+        compression_level: int = 3,
+        verify: bool = True
+):
+    """Sort parquet file by EventTime column with optimized settings."""
+
+    input_file = Path(input_path)
+    output_file = Path(output_path)
+
+    if not input_file.exists():
+        print(f"Error: Input file not found: {input_file}")
+        sys.exit(1)
+
+    if output_file.exists():
+        print(f"Sorted file already exists: {output_file}")
+        if verify:
+            verify_sorted_file(output_file)
+        return
+
+    try:
+        print(f"Reading {input_file.name}...")
+        table = pq.read_table(str(input_file))
+
+        print(f"Original table has {len(table):,} rows")
+        print("Sorting by EventTime...")
+
+        # Sort the table by EventTime
+        sorted_indices = pc.sort_indices(table, sort_keys=[("EventTime", 
"ascending")])
+        sorted_table = pc.take(table, sorted_indices)
+
+        print(f"Sorted table has {len(sorted_table):,} rows")
+
+        # Verify sort
+        event_times = sorted_table.column('EventTime').to_pylist()
+        if event_times and verify:
+            print(f"First EventTime: {event_times[0]}")
+            print(f"Last EventTime: {event_times[-1]}")
+            # Verify ascending order
+            is_sorted = all(event_times[i] <= event_times[i+1] for i in 
range(min(1000, len(event_times)-1)))
+            print(f"Sort verification (first 1000 rows): {'✓ PASS' if 
is_sorted else '✗ FAIL'}")
+
+        print(f"Writing sorted file to {output_file}...")
+        print(f"  Row group size: {row_group_size:,} rows")
+        print(f"  Compression: {compression}")
+
+        # Write sorted table with optimized settings
+        pq.write_table(
+            sorted_table,
+            str(output_file),
+            compression=compression,
+            use_dictionary=True,
+            write_statistics=True,
+            # Optimize row group size for better performance
+            row_group_size=row_group_size,
+            # Set data page size (1MB is good for most cases)
+            data_page_size=1024 * 1024,
+            # Use v2 data page format for better compression
+            use_deprecated_int96_timestamps=False,
+            coerce_timestamps='us',  # Use microsecond precision
+            # Batch size for writing
+            write_batch_size=min(row_group_size, 1024 * 64),
+            # Enable compression for all columns
+            compression_level=compression_level,  # Use default compression 
level
+        )
+
+        # Report results
+        input_size_mb = input_file.stat().st_size / (1024**2)
+        output_size_mb = output_file.stat().st_size / (1024**2)
+
+        # Read metadata to verify row groups
+        parquet_file = pq.ParquetFile(str(output_file))
+        num_row_groups = parquet_file.num_row_groups
+
+        print("\n✓ Successfully created sorted file!")
+        print(f"  Input:  {input_size_mb:.1f} MB")
+        print(f"  Output: {output_size_mb:.1f} MB")
+        print(f"  Compression ratio: {input_size_mb/output_size_mb:.2f}x")
+        print("\nRow Group Statistics:")
+        print(f"  Total row groups: {num_row_groups}")
+        print(f"  Total rows: {len(sorted_table):,}")
+
+        # Show row group details
+        for i in range(min(3, num_row_groups)):
+            rg_metadata = parquet_file.metadata.row_group(i)
+            print(f"  Row group {i}: {rg_metadata.num_rows:,} rows, 
{rg_metadata.total_byte_size / 1024**2:.1f} MB")
+
+        if num_row_groups > 3:
+            print(f"  ... and {num_row_groups - 3} more row groups")
+
+        avg_rows_per_group = len(sorted_table) / num_row_groups if 
num_row_groups > 0 else 0

Review Comment:
   The division by zero check `if num_row_groups > 0` is done after the 
division, making it ineffective. This will cause a ZeroDivisionError if 
`num_row_groups` is 0. Move the check before the division: `avg_rows_per_group 
= len(sorted_table) / num_row_groups if num_row_groups > 0 else 0`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to