This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 97f95b3 Make `MemoryManager` and `MemoryStream` public (#1664)
97f95b3 is described below
commit 97f95b386b6f633488c85a7b0d3aba1ba38d7ac6
Author: Yijie Shen <[email protected]>
AuthorDate: Tue Jan 25 04:06:15 2022 +0800
Make `MemoryManager` and `MemoryStream` public (#1664)
* Make MemoryManager and MemoryStream public
* lint
---
ballista/rust/core/src/client.rs | 1 -
.../core/src/execution_plans/shuffle_reader.rs | 1 -
.../core/src/execution_plans/shuffle_writer.rs | 2 +-
.../core/src/execution_plans/unresolved_shuffle.rs | 1 -
ballista/rust/core/src/lib.rs | 1 -
ballista/rust/core/src/memory_stream.rs | 92 ----------------------
ballista/rust/core/src/utils.rs | 1 -
datafusion/src/execution/mod.rs | 2 +-
datafusion/src/physical_plan/memory.rs | 2 +-
9 files changed, 3 insertions(+), 100 deletions(-)
diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index 4b82c34..b40c878 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -25,7 +25,6 @@ use std::{
};
use crate::error::{ballista_error, BallistaError, Result};
-use crate::memory_stream::MemoryStream;
use crate::serde::protobuf::{self};
use crate::serde::scheduler::{
Action, ExecutePartition, ExecutePartitionResult, PartitionId,
PartitionStats,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 4d401ec..ea3381d 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
use std::{any::Any, pin::Pin};
use crate::client::BallistaClient;
-use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
use crate::utils::WrappedStream;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 0962615..6a6bc3e 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -28,7 +28,6 @@ use std::time::Instant;
use std::{any::Any, pin::Pin};
use crate::error::BallistaError;
-use crate::memory_stream::MemoryStream;
use crate::utils;
use crate::serde::protobuf::ShuffleWritePartition;
@@ -47,6 +46,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
+use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
};
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 6de8dba..e14c1eb 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -18,7 +18,6 @@
use std::sync::Arc;
use std::{any::Any, pin::Pin};
-use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionLocation;
use async_trait::async_trait;
diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs
index 4e51067..bc7be4f 100644
--- a/ballista/rust/core/src/lib.rs
+++ b/ballista/rust/core/src/lib.rs
@@ -27,7 +27,6 @@ pub mod client;
pub mod config;
pub mod error;
pub mod execution_plans;
-pub mod memory_stream;
pub mod utils;
#[macro_use]
diff --git a/ballista/rust/core/src/memory_stream.rs
b/ballista/rust/core/src/memory_stream.rs
deleted file mode 100644
index 0c0ba4b..0000000
--- a/ballista/rust/core/src/memory_stream.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This is copied from DataFusion because it is declared as `pub(crate)`. See
-//! https://issues.apache.org/jira/browse/ARROW-11276.
-
-use std::task::{Context, Poll};
-
-use datafusion::arrow::{datatypes::SchemaRef, error::Result,
record_batch::RecordBatch};
-use datafusion::physical_plan::RecordBatchStream;
-use futures::Stream;
-
-/// Iterator over batches
-
-pub struct MemoryStream {
- /// Vector of record batches
- data: Vec<RecordBatch>,
- /// Schema representing the data
- schema: SchemaRef,
- /// Optional projection for which columns to load
- projection: Option<Vec<usize>>,
- /// Index into the data
- index: usize,
-}
-
-impl MemoryStream {
- /// Create an iterator for a vector of record batches
-
- pub fn try_new(
- data: Vec<RecordBatch>,
- schema: SchemaRef,
- projection: Option<Vec<usize>>,
- ) -> Result<Self> {
- Ok(Self {
- data,
- schema,
- projection,
- index: 0,
- })
- }
-}
-
-impl Stream for MemoryStream {
- type Item = Result<RecordBatch>;
-
- fn poll_next(
- mut self: std::pin::Pin<&mut Self>,
- _: &mut Context<'_>,
- ) -> Poll<Option<Self::Item>> {
- Poll::Ready(if self.index < self.data.len() {
- self.index += 1;
-
- let batch = &self.data[self.index - 1];
-
- // apply projection
- let next_batch = match &self.projection {
- Some(projection) => batch.project(projection)?,
- None => batch.clone(),
- };
-
- Some(Ok(next_batch))
- } else {
- None
- })
- }
-
- fn size_hint(&self) -> (usize, Option<usize>) {
- (self.data.len(), Some(self.data.len()))
- }
-}
-
-impl RecordBatchStream for MemoryStream {
- /// Get the schema
-
- fn schema(&self) -> SchemaRef {
- self.schema.clone()
- }
-}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index e208461..efe175f 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -26,7 +26,6 @@ use crate::error::{BallistaError, Result};
use crate::execution_plans::{
DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
};
-use crate::memory_stream::MemoryStream;
use crate::serde::scheduler::PartitionStats;
use crate::config::BallistaConfig;
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index ebc7c01..0c92627 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -20,6 +20,6 @@
pub mod context;
pub mod dataframe_impl;
pub(crate) mod disk_manager;
-pub(crate) mod memory_manager;
+pub mod memory_manager;
pub mod options;
pub mod runtime_env;
diff --git a/datafusion/src/physical_plan/memory.rs
b/datafusion/src/physical_plan/memory.rs
index 8e32b09..8e5f379 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -147,7 +147,7 @@ impl MemoryExec {
}
/// Iterator over batches
-pub(crate) struct MemoryStream {
+pub struct MemoryStream {
/// Vector of record batches
data: Vec<RecordBatch>,
/// Schema representing the data