timsaucer commented on code in PR #750: URL: https://github.com/apache/datafusion-python/pull/750#discussion_r1675870247
########## python/datafusion/context.py: ########## @@ -0,0 +1,1167 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import annotations + +from ._internal import SessionConfig as SessionConfigInternal +from ._internal import RuntimeConfig as RuntimeConfigInternal +from ._internal import SQLOptions as SQLOptionsInternal +from ._internal import SessionContext as SessionContextInternal +from ._internal import LogicalPlan, ExecutionPlan # TODO MAKE THIS A DEFINED CLASS + +from datafusion._internal import AggregateUDF +from datafusion.catalog import Catalog, Table +from datafusion.dataframe import DataFrame +from datafusion.expr import Expr +from datafusion.record_batch import RecordBatchStream +from datafusion.udf import ScalarUDF + +from typing import Any, TYPE_CHECKING +from typing_extensions import deprecated + +if TYPE_CHECKING: + import pyarrow + import pandas + import polars + + +class SessionConfig: + def __init__(self, config_options: dict[str, str] = {}) -> None: + """Create a new `SessionConfig` with the given configuration options. + + Parameters + ---------- + config_options : dict[str, str] + Configuration options. + """ + self.config_internal = SessionConfigInternal(config_options) + + def with_create_default_catalog_and_schema( + self, enabled: bool = True + ) -> SessionConfig: + """Control whether the default catalog and schema will be automatically created. + + Parameters + ---------- + enabled : bool + Whether the default catalog and schema will be automatically created. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = ( + self.config_internal.with_create_default_catalog_and_schema(enabled) + ) + return self + + def with_default_catalog_and_schema( + self, catalog: str, schema: str + ) -> SessionConfig: + """Select a name for the default catalog and shcema. + + Parameters + ---------- + catalog : str + Catalog name. + schema : str + Schema name. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_default_catalog_and_schema( + catalog, schema + ) + return self + + def with_information_schema(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the inclusion of `information_schema` virtual tables. + + Parameters + ---------- + enabled : bool + Whether to include `information_schema` virtual tables. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_information_schema(enabled) + return self + + def with_batch_size(self, batch_size: int) -> SessionConfig: + """Customize batch size. + + Parameters + ---------- + batch_size : int + Batch size. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_batch_size(batch_size) + return self + + def with_target_partitions(self, target_partitions: int) -> SessionConfig: + """Customize the number of target partitions for query execution. + + Increasing partitions can increase concurrency. + + Parameters + ---------- + target_partitions : int + Number of target partitions. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_target_partitions( + target_partitions + ) + return self + + def with_repartition_aggregations(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of repartitioning for aggregations. + + Enabling this improves parallelism. + + Parameters + ---------- + enabled : bool + Whether to use repartitioning for aggregations. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_aggregations( + enabled + ) + return self + + def with_repartition_joins(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of repartitioning for joins to improve parallelism. + + Parameters + ---------- + enabled : bool + Whether to use repartitioning for joins. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_joins(enabled) + return self + + def with_repartition_windows(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of repartitioning for window functions to improve parallelism. + + Parameters + ---------- + enabled : bool + Whether to use repartitioning for window functions. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_windows(enabled) + return self + + def with_repartition_sorts(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of repartitioning for window functions to improve parallelism. + + Parameters + ---------- + enabled : bool + Whether to use repartitioning for window functions. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_sorts(enabled) + return self + + def with_repartition_file_scans(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of repartitioning for file scans. + + Parameters + ---------- + enabled : bool + Whether to use repartitioning for file scans. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_file_scans(enabled) + return self + + def with_repartition_file_min_size(self, size: int) -> SessionConfig: + """Set minimum file range size for repartitioning scans. + + Parameters + ---------- + size : int + Minimum file range size. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_repartition_file_min_size(size) + return self + + def with_parquet_pruning(self, enabled: bool = True) -> SessionConfig: + """Enable or disable the use of pruning predicate for parquet readers to skip row groups. + + Parameters + ---------- + enabled : bool + Whether to use pruning predicate for parquet readers. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.with_parquet_pruning(enabled) + return self + + def set(self, key: str, value: str) -> SessionConfig: + """Set a configuration option. + + Parameters + ---------- + key : str + Option key. + value : str + Option value. + + Returns + ------- + SessionConfig + A new `SessionConfig` object with the updated setting. + """ + self.config_internal = self.config_internal.set(key, value) + return self + + +class RuntimeConfig: + def __init__(self) -> None: + """Create a new `RuntimeConfig` with default values.""" + self.config_internal = RuntimeConfigInternal() + + def with_disk_manager_disabled(self) -> RuntimeConfig: + """Disable the disk manager, attempts to create temporary files will error. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + >>> config = RuntimeConfig().with_disk_manager_disabled() + """ + self.config_internal = self.config_internal.with_disk_manager_disabled() + return self + + def with_disk_manager_os(self) -> RuntimeConfig: + """Use the operating system's temporary directory for disk manager. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + >>> config = RuntimeConfig().with_disk_manager_os() + """ + self.config_internal = self.config_internal.with_disk_manager_os() + return self + + def with_disk_manager_specified(self, paths: list[str]) -> RuntimeConfig: + """Use the specified paths for the disk manager's temporary files. + + Parameters + ---------- + paths : list[str] + Paths to use for the disk manager's temporary files. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + >>> config = RuntimeConfig().with_disk_manager_specified(["/tmp"]) + """ + self.config_internal = self.config_internal.with_disk_manager_specified(paths) + return self + + def with_unbounded_memory_pool(self) -> RuntimeConfig: + """Use an unbounded memory pool. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + >>> config = RuntimeConfig().with_unbounded_memory_pool() + """ + self.config_internal = self.config_internal.with_unbounded_memory_pool() + return self + + def with_fair_spill_pool(self, size: int) -> RuntimeConfig: + """Use a fair spill pool with the specified size. + + This pool works best when you know beforehand the query has multiple spillable + operators that will likely all need to spill. Sometimes it will cause spills + even when there was sufficient memory (reserved for other operators) to avoid + doing so. + + ```text + ┌───────────────────────z──────────────────────z───────────────┐ + │ z z │ + │ z z │ + │ Spillable z Unspillable z Free │ + │ Memory z Memory z Memory │ + │ z z │ + │ z z │ + └───────────────────────z──────────────────────z───────────────┘ + ``` + + Parameters + ---------- + size : int + Size of the memory pool in bytes. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + ```python + >>> config = RuntimeConfig().with_fair_spill_pool(1024) + ``` + """ + self.config_internal = self.config_internal.with_fair_spill_pool(size) + return self + + def with_greedy_memory_pool(self, size: int) -> RuntimeConfig: + """Use a greedy memory pool with the specified size. + + This pool works well for queries that do not need to spill or have a single + spillable operator. See `RuntimeConfig.with_fair_spill_pool` if there are + multiple spillable operators that all will spill. + + Parameters + ---------- + size : int + Size of the memory pool in bytes. + + Returns + ------- + RuntimeConfig + A new `RuntimeConfig` object with the updated setting. + + Examples + -------- + >>> config = RuntimeConfig().with_greedy_memory_pool(1024) + """ + self.config_internal = self.config_internal.with_greedy_memory_pool(size) + return self + + def with_temp_file_path(self, path: str) -> RuntimeConfig: Review Comment: Added, but not resolving this comment yet until unit tests are also added -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org