This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 8de2a76 minor: format the annotation (#2047)
8de2a76 is described below
commit 8de2a7610df33adf75ba515c4ee2fe935f7aa114
Author: jakevin <[email protected]>
AuthorDate: Mon Mar 21 21:55:59 2022 +0800
minor: format the annotation (#2047)
---
ballista/rust/core/src/error.rs | 2 +-
.../core/src/execution_plans/shuffle_writer.rs | 4 ++--
.../core/src/execution_plans/unresolved_shuffle.rs | 2 +-
ballista/rust/core/src/serde/logical_plan/mod.rs | 6 +++---
ballista/rust/executor/src/cpu_bound_executor.rs | 4 ++--
ballista/rust/executor/src/execution_loop.rs | 4 ++--
ballista/rust/executor/src/executor_server.rs | 6 +++---
ballista/rust/scheduler/src/main.rs | 2 +-
ballista/rust/scheduler/src/planner.rs | 2 +-
.../rust/scheduler/src/scheduler_server/grpc.rs | 2 +-
.../src/scheduler_server/query_stage_scheduler.rs | 2 +-
datafusion-expr/src/built_in_function.rs | 6 +++---
.../src/expressions/distinct_expressions.rs | 2 +-
datafusion-physical-expr/src/tdigest/mod.rs | 2 +-
datafusion-proto/src/from_proto.rs | 10 +++++-----
datafusion-proto/src/lib.rs | 22 +++++++++++-----------
datafusion/src/logical_plan/expr_rewriter.rs | 2 +-
.../src/optimizer/single_distinct_to_groupby.rs | 2 +-
.../src/physical_optimizer/coalesce_batches.rs | 6 +++---
datafusion/tests/sql/expr.rs | 2 +-
datafusion/tests/sql/mod.rs | 2 +-
21 files changed, 46 insertions(+), 46 deletions(-)
diff --git a/ballista/rust/core/src/error.rs b/ballista/rust/core/src/error.rs
index 64b20e3..ba6cdc7 100644
--- a/ballista/rust/core/src/error.rs
+++ b/ballista/rust/core/src/error.rs
@@ -40,7 +40,7 @@ pub enum BallistaError {
SqlError(parser::ParserError),
IoError(io::Error),
// ReqwestError(reqwest::Error),
- //HttpError(http::Error),
+ // HttpError(http::Error),
// KubeAPIError(kube::error::Error),
// KubeAPIRequestError(k8s_openapi::RequestError),
// KubeAPIResponseError(k8s_openapi::ResponseError),
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 5f4d67f..9058cc4 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -248,8 +248,8 @@ impl ShuffleWriterExec {
// write non-empty batch out
- //TODO optimize so we don't write or fetch empty
partitions
- //if output_batch.num_rows() > 0 {
+ // TODO optimize so we don't write or fetch empty
partitions
+ // if output_batch.num_rows() > 0 {
let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 868620b..83d4598 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -74,7 +74,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
}
fn output_partitioning(&self) -> Partitioning {
- //TODO the output partition is known and should be populated here!
+ // TODO the output partition is known and should be populated here!
// see https://github.com/apache/arrow-datafusion/issues/758
Partitioning::UnknownPartitioning(self.output_partition_count)
}
diff --git a/ballista/rust/core/src/serde/logical_plan/mod.rs
b/ballista/rust/core/src/serde/logical_plan/mod.rs
index fa155ce..2e95c17 100644
--- a/ballista/rust/core/src/serde/logical_plan/mod.rs
+++ b/ballista/rust/core/src/serde/logical_plan/mod.rs
@@ -704,8 +704,8 @@ impl AsLogicalPlan for LogicalPlanNode {
extension_codec,
)?;
- //Assumed common usize field was batch size
- //Used u64 to avoid any nastyness involving large values, most
data clusters are probably uniformly 64 bits any ways
+ // Assumed common usize field was batch size
+ // Used u64 to avoid any nastyness involving large values,
most data clusters are probably uniformly 64 bits any ways
use protobuf::repartition_node::PartitionMethod;
let pb_partition_method =
@@ -935,7 +935,7 @@ mod roundtrip_tests {
}
}
- //Given a identity of a LogicalPlan converts it to protobuf and back,
using debug formatting to test equality.
+ // Given a identity of a LogicalPlan converts it to protobuf and back,
using debug formatting to test equality.
macro_rules! roundtrip_test {
($initial_struct:ident, $proto_type:ty, $struct_type:ty) => {
let proto: $proto_type = (&$initial_struct).try_into()?;
diff --git a/ballista/rust/executor/src/cpu_bound_executor.rs
b/ballista/rust/executor/src/cpu_bound_executor.rs
index 0bc2ba0..316d3d1 100644
--- a/ballista/rust/executor/src/cpu_bound_executor.rs
+++ b/ballista/rust/executor/src/cpu_bound_executor.rs
@@ -97,8 +97,8 @@ impl DedicatedExecutor {
let (tx, rx) = std::sync::mpsc::channel();
- //Cannot create a separated tokio runtime in another tokio runtime,
- //So use std::thread to spawn a thread
+ // Cannot create a separated tokio runtime in another tokio runtime,
+ // So use std::thread to spawn a thread
let thread = std::thread::spawn(move || {
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
diff --git a/ballista/rust/executor/src/execution_loop.rs
b/ballista/rust/executor/src/execution_loop.rs
index 93e1884..833c06f 100644
--- a/ballista/rust/executor/src/execution_loop.rs
+++ b/ballista/rust/executor/src/execution_loop.rs
@@ -128,9 +128,9 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U:
'static + AsExecution
let runtime = executor.ctx.runtime_env();
- //TODO get session_id from TaskDefinition
+ // TODO get session_id from TaskDefinition
let session_id = "mock_session".to_owned();
- //TODO get task_props from TaskDefinition
+ // TODO get task_props from TaskDefinition
let task_props = HashMap::new();
let task_context = Arc::new(TaskContext::new(
diff --git a/ballista/rust/executor/src/executor_server.rs
b/ballista/rust/executor/src/executor_server.rs
index 7427399..1ad5f05 100644
--- a/ballista/rust/executor/src/executor_server.rs
+++ b/ballista/rust/executor/src/executor_server.rs
@@ -181,9 +181,9 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> ExecutorServer<T,
let runtime = self.executor.ctx.runtime_env();
- //TODO get session_id from TaskDefinition
+ // TODO get session_id from TaskDefinition
let session_id = "mock_session".to_owned();
- //TODO get task_props from TaskDefinition
+ // TODO get task_props from TaskDefinition
let task_props = HashMap::new();
let task_context = Arc::new(TaskContext::new(
@@ -281,7 +281,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> TaskRunnerPool<T,
info!("Starting the task runner pool");
// Use a dedicated executor for CPU bound tasks so that the main
tokio
// executor can still answer requests even when under load
- //TODO make it configurable
+ // TODO make it configurable
let dedicated_executor = DedicatedExecutor::new("task_runner", 4);
loop {
if let Some(task) = rx_task.recv().await {
diff --git a/ballista/rust/scheduler/src/main.rs
b/ballista/rust/scheduler/src/main.rs
index f35650c..e37f419 100644
--- a/ballista/rust/scheduler/src/main.rs
+++ b/ballista/rust/scheduler/src/main.rs
@@ -74,7 +74,7 @@ async fn start_server(
"Ballista v{} Scheduler listening on {:?}",
BALLISTA_VERSION, addr
);
- //should only call SchedulerServer::new() once in the process
+ // Should only call SchedulerServer::new() once in the process
info!(
"Starting Scheduler grpc server with task scheduling policy of {:?}",
policy
diff --git a/ballista/rust/scheduler/src/planner.rs
b/ballista/rust/scheduler/src/planner.rs
index 8b5751a..3580e81 100644
--- a/ballista/rust/scheduler/src/planner.rs
+++ b/ballista/rust/scheduler/src/planner.rs
@@ -529,7 +529,7 @@ order by
let join_input_2 = join_input_2.children()[0].clone();
let unresolved_shuffle_reader_2 =
downcast_exec!(join_input_2, UnresolvedShuffleExec);
- assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1);
//orders
+ assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1); //
orders
assert_eq!(unresolved_shuffle_reader_2.output_partition_count, 2);
// final partitioned hash aggregate
diff --git a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
index d6f95fb..428af6a 100644
--- a/ballista/rust/scheduler/src/scheduler_server/grpc.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/grpc.rs
@@ -289,7 +289,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static +
AsExecutionPlan> SchedulerGrpc
let file_format: Arc<dyn FileFormat> = match file_type {
FileType::Parquet => Ok(Arc::new(ParquetFormat::default())),
- //TODO implement for CSV
+ // TODO implement for CSV
_ => Err(tonic::Status::unimplemented(
"get_file_metadata unsupported file type",
)),
diff --git
a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
index c29d7be..4b22ed1 100644
--- a/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
+++ b/ballista/rust/scheduler/src/scheduler_server/query_stage_scheduler.rs
@@ -435,7 +435,7 @@ fn get_job_status_from_tasks(
.map(|info| {
let mut partition_location = vec![];
for (status, executor_id, partitions) in info {
- let input_partition_id = status.task_id.as_ref().unwrap();
//TODO unwrap
+ let input_partition_id = status.task_id.as_ref().unwrap(); //
TODO unwrap
let executor_meta = executors.get(executor_id).map(|e|
e.clone().into());
for shuffle_write_partition in partitions {
let shuffle_input_partition_id =
Some(protobuf::PartitionId {
diff --git a/datafusion-expr/src/built_in_function.rs
b/datafusion-expr/src/built_in_function.rs
index 8762682..db4c509 100644
--- a/datafusion-expr/src/built_in_function.rs
+++ b/datafusion-expr/src/built_in_function.rs
@@ -168,7 +168,7 @@ impl BuiltinScalarFunction {
/// Returns the [Volatility] of the builtin function.
pub fn volatility(&self) -> Volatility {
match self {
- //Immutable scalar builtins
+ // Immutable scalar builtins
BuiltinScalarFunction::Abs => Volatility::Immutable,
BuiltinScalarFunction::Acos => Volatility::Immutable,
BuiltinScalarFunction::Asin => Volatility::Immutable,
@@ -231,10 +231,10 @@ impl BuiltinScalarFunction {
BuiltinScalarFunction::Upper => Volatility::Immutable,
BuiltinScalarFunction::RegexpMatch => Volatility::Immutable,
- //Stable builtin functions
+ // Stable builtin functions
BuiltinScalarFunction::Now => Volatility::Stable,
- //Volatile builtin functions
+ // Volatile builtin functions
BuiltinScalarFunction::Random => Volatility::Volatile,
}
}
diff --git a/datafusion-physical-expr/src/expressions/distinct_expressions.rs
b/datafusion-physical-expr/src/expressions/distinct_expressions.rs
index c249ca8..e4f1e01 100644
--- a/datafusion-physical-expr/src/expressions/distinct_expressions.rs
+++ b/datafusion-physical-expr/src/expressions/distinct_expressions.rs
@@ -538,7 +538,7 @@ mod tests {
}};
}
- //Used trait to create associated constant for f32 and f64
+ // Used trait to create associated constant for f32 and f64
trait SubNormal: 'static {
const SUBNORMAL: Self;
}
diff --git a/datafusion-physical-expr/src/tdigest/mod.rs
b/datafusion-physical-expr/src/tdigest/mod.rs
index 6c1b284..3f5cbfb 100644
--- a/datafusion-physical-expr/src/tdigest/mod.rs
+++ b/datafusion-physical-expr/src/tdigest/mod.rs
@@ -740,7 +740,7 @@ mod tests {
#[test]
fn test_centroid_addition_regression() {
- //https://github.com/MnO2/t-digest/pull/1
+ // https://github.com/MnO2/t-digest/pull/1
let vals = vec![1.0, 1.0, 1.0, 2.0, 1.0, 1.0];
let mut t = TDigest::new(10);
diff --git a/datafusion-proto/src/from_proto.rs
b/datafusion-proto/src/from_proto.rs
index 8789129..1502f5f 100644
--- a/datafusion-proto/src/from_proto.rs
+++ b/datafusion-proto/src/from_proto.rs
@@ -515,13 +515,13 @@ impl TryFrom<&protobuf::scalar_type::Datatype> for
DataType {
}
let field_type =
protobuf::PrimitiveScalarType::try_from(deepest_type)?.into();
- //Because length is checked above it is safe to unwrap .last()
+ // Because length is checked above it is safe to unwrap .last()
let mut scalar_type = DataType::List(Box::new(Field::new(
field_names.last().unwrap().as_str(),
field_type,
true,
)));
- //Iterate over field names in reverse order except for the
last item in the vector
+ // Iterate over field names in reverse order except for the
last item in the vector
for name in field_names.iter().rev().skip(1) {
let new_datatype = DataType::List(Box::new(Field::new(
name.as_str(),
@@ -751,12 +751,12 @@ impl TryFrom<&protobuf::ScalarListType> for DataType {
}
let mut curr_type = Self::List(Box::new(Field::new(
- //Since checked vector is not empty above this is safe to unwrap
+ // Since checked vector is not empty above this is safe to unwrap
field_names.last().unwrap(),
PrimitiveScalarType::try_from(deepest_type)?.into(),
true,
)));
- //Iterates over field names in reverse order except for the last item
in the vector
+ // Iterates over field names in reverse order except for the last item
in the vector
for name in field_names.iter().rev().skip(1) {
let temp_curr_type = Self::List(Box::new(Field::new(name,
curr_type, true)));
curr_type = temp_curr_type;
@@ -930,7 +930,7 @@ pub fn parse_expr(
.iter()
.map(|e| parse_expr(e, ctx))
.collect::<Result<Vec<_>, _>>()?,
- distinct: false, //TODO
+ distinct: false, // TODO
})
}
ExprType::Alias(alias) => Ok(Expr::Alias(
diff --git a/datafusion-proto/src/lib.rs b/datafusion-proto/src/lib.rs
index 0688215..a45dfb0 100644
--- a/datafusion-proto/src/lib.rs
+++ b/datafusion-proto/src/lib.rs
@@ -63,7 +63,7 @@ mod roundtrip_tests {
#[test]
fn scalar_values_error_serialization() {
let should_fail_on_seralize: Vec<ScalarValue> = vec![
- //Should fail due to inconsistent types
+ // Should fail due to inconsistent types
ScalarValue::List(
Some(Box::new(vec![
ScalarValue::Int16(None),
@@ -267,7 +267,7 @@ mod roundtrip_tests {
DataType::Time64(TimeUnit::Nanosecond),
DataType::Utf8,
DataType::LargeUtf8,
- //Recursive list tests
+ // Recursive list tests
DataType::List(new_box_field("Level1", DataType::Boolean, true)),
DataType::List(new_box_field(
"Level1",
@@ -279,7 +279,7 @@ mod roundtrip_tests {
let should_fail: Vec<DataType> = vec![
DataType::Null,
DataType::Float16,
- //Add more timestamp tests
+ // Add more timestamp tests
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Date64,
DataType::Time32(TimeUnit::Second),
@@ -300,7 +300,7 @@ mod roundtrip_tests {
DataType::FixedSizeBinary(-432),
DataType::LargeBinary,
DataType::Decimal(1345, 5431),
- //Recursive list tests
+ // Recursive list tests
DataType::List(new_box_field("Level1", DataType::Binary, true)),
DataType::List(new_box_field(
"Level1",
@@ -311,7 +311,7 @@ mod roundtrip_tests {
)),
true,
)),
- //Fixed size lists
+ // Fixed size lists
DataType::FixedSizeList(new_box_field("Level1", DataType::Binary,
true), 4),
DataType::FixedSizeList(
new_box_field(
@@ -325,7 +325,7 @@ mod roundtrip_tests {
),
41,
),
- //Struct Testing
+ // Struct Testing
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
@@ -426,7 +426,7 @@ mod roundtrip_tests {
DataType::Float16,
DataType::Float32,
DataType::Float64,
- //Add more timestamp tests
+ // Add more timestamp tests
DataType::Timestamp(TimeUnit::Millisecond, None),
DataType::Date32,
DataType::Date64,
@@ -452,7 +452,7 @@ mod roundtrip_tests {
DataType::Utf8,
DataType::LargeUtf8,
DataType::Decimal(1345, 5431),
- //Recursive list tests
+ // Recursive list tests
DataType::List(new_box_field("Level1", DataType::Binary, true)),
DataType::List(new_box_field(
"Level1",
@@ -463,7 +463,7 @@ mod roundtrip_tests {
)),
true,
)),
- //Fixed size lists
+ // Fixed size lists
DataType::FixedSizeList(new_box_field("Level1", DataType::Binary,
true), 4),
DataType::FixedSizeList(
new_box_field(
@@ -477,7 +477,7 @@ mod roundtrip_tests {
),
41,
),
- //Struct Testing
+ // Struct Testing
DataType::Struct(vec![
Field::new("nullable", DataType::Boolean, false),
Field::new("name", DataType::Utf8, false),
@@ -565,7 +565,7 @@ mod roundtrip_tests {
ScalarValue::Date32(None),
ScalarValue::TimestampMicrosecond(None, None),
ScalarValue::TimestampNanosecond(None, None),
- //ScalarValue::List(None, DataType::Boolean)
+ // ScalarValue::List(None, DataType::Boolean)
];
for test_case in test_types.into_iter() {
diff --git a/datafusion/src/logical_plan/expr_rewriter.rs
b/datafusion/src/logical_plan/expr_rewriter.rs
index 9cf187e..d20b763 100644
--- a/datafusion/src/logical_plan/expr_rewriter.rs
+++ b/datafusion/src/logical_plan/expr_rewriter.rs
@@ -421,7 +421,7 @@ pub fn unnormalize_col(expr: Expr) -> Expr {
impl ExprRewriter for RemoveQualifier {
fn mutate(&mut self, expr: Expr) -> Result<Expr> {
if let Expr::Column(col) = expr {
- //let Column { relation: _, name } = col;
+ // let Column { relation: _, name } = col;
Ok(Expr::Column(Column {
relation: None,
name: col.name,
diff --git a/datafusion/src/optimizer/single_distinct_to_groupby.rs
b/datafusion/src/optimizer/single_distinct_to_groupby.rs
index 4f01438..dfbefa6 100644
--- a/datafusion/src/optimizer/single_distinct_to_groupby.rs
+++ b/datafusion/src/optimizer/single_distinct_to_groupby.rs
@@ -121,7 +121,7 @@ fn optimize(plan: &LogicalPlan) -> Result<LogicalPlan> {
schema: final_agg_schema.clone(),
});
- //so the aggregates are displayed in the same way even after
the rewrite
+ // so the aggregates are displayed in the same way even after
the rewrite
let mut alias_expr: Vec<Expr> = Vec::new();
final_agg
.expressions()
diff --git a/datafusion/src/physical_optimizer/coalesce_batches.rs
b/datafusion/src/physical_optimizer/coalesce_batches.rs
index 47d87d3..fbfca7b 100644
--- a/datafusion/src/physical_optimizer/coalesce_batches.rs
+++ b/datafusion/src/physical_optimizer/coalesce_batches.rs
@@ -53,7 +53,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
.collect::<Result<Vec<_>>>()?;
let plan_any = plan.as_any();
- //TODO we should do this in a more generic way either by wrapping all
operators
+ // TODO we should do this in a more generic way either by wrapping all
operators
// or having an API so that operators can declare when their inputs or
outputs
// need to be wrapped in a coalesce batches operator.
// See https://issues.apache.org/jira/browse/ARROW-11068
@@ -61,7 +61,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
|| plan_any.downcast_ref::<HashJoinExec>().is_some()
|| plan_any.downcast_ref::<RepartitionExec>().is_some();
- //TODO we should also do this for HashAggregateExec but we need to
update tests
+ // TODO we should also do this for HashAggregateExec but we need to
update tests
// as part of this work - see
https://issues.apache.org/jira/browse/ARROW-11068
// || plan_any.downcast_ref::<HashAggregateExec>().is_some();
@@ -71,7 +71,7 @@ impl PhysicalOptimizerRule for CoalesceBatches {
} else {
let plan = plan.with_new_children(children)?;
Ok(if wrap_in_coalesce {
- //TODO we should add specific configuration settings for
coalescing batches and
+ // TODO we should add specific configuration settings for
coalescing batches and
// we should do that once
https://issues.apache.org/jira/browse/ARROW-11059 is
// implemented. For now, we choose half the configured batch
size to avoid copies
// when a small number of rows are removed from a batch
diff --git a/datafusion/tests/sql/expr.rs b/datafusion/tests/sql/expr.rs
index f70c8da..d600c7b 100644
--- a/datafusion/tests/sql/expr.rs
+++ b/datafusion/tests/sql/expr.rs
@@ -726,7 +726,7 @@ async fn test_extract_date_part() -> Result<()> {
);
test_expression!("date_part('WEEK', CAST('2003-01-01' AS DATE))", "1");
- //TODO Creating logical plan for 'SELECT EXTRACT(WEEK FROM
to_timestamp('2020-09-08T12:00:00+00:00'))'
+ // TODO Creating logical plan for 'SELECT EXTRACT(WEEK FROM
to_timestamp('2020-09-08T12:00:00+00:00'))'
// SQL(ParserError("Expected date/time field, found: WEEK"))'
// will fix in sqlparser
diff --git a/datafusion/tests/sql/mod.rs b/datafusion/tests/sql/mod.rs
index cea85ba..05c5abd 100644
--- a/datafusion/tests/sql/mod.rs
+++ b/datafusion/tests/sql/mod.rs
@@ -660,7 +660,7 @@ where
let timestamps = vec![
1599572549190855000i64 / divisor, // 2020-09-08T13:42:29.190855+00:00
1599568949190855000 / divisor, // 2020-09-08T12:42:29.190855+00:00
- 1599565349190855000 / divisor, //2020-09-08T11:42:29.190855+00:00
+ 1599565349190855000 / divisor, // 2020-09-08T11:42:29.190855+00:00
]; // 2020-09-08T11:42:29.190855+00:00
let array = PrimitiveArray::<A>::from_vec(timestamps, tz);