This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 97f95b3  Make `MemoryManager` and `MemoryStream` public (#1664)
97f95b3 is described below

commit 97f95b386b6f633488c85a7b0d3aba1ba38d7ac6
Author: Yijie Shen <[email protected]>
AuthorDate: Tue Jan 25 04:06:15 2022 +0800

    Make `MemoryManager` and `MemoryStream` public (#1664)
    
    * Make MemoryManager and MemoryStream public
    
    * lint
---
 ballista/rust/core/src/client.rs                   |  1 -
 .../core/src/execution_plans/shuffle_reader.rs     |  1 -
 .../core/src/execution_plans/shuffle_writer.rs     |  2 +-
 .../core/src/execution_plans/unresolved_shuffle.rs |  1 -
 ballista/rust/core/src/lib.rs                      |  1 -
 ballista/rust/core/src/memory_stream.rs            | 92 ----------------------
 ballista/rust/core/src/utils.rs                    |  1 -
 datafusion/src/execution/mod.rs                    |  2 +-
 datafusion/src/physical_plan/memory.rs             |  2 +-
 9 files changed, 3 insertions(+), 100 deletions(-)

diff --git a/ballista/rust/core/src/client.rs b/ballista/rust/core/src/client.rs
index 4b82c34..b40c878 100644
--- a/ballista/rust/core/src/client.rs
+++ b/ballista/rust/core/src/client.rs
@@ -25,7 +25,6 @@ use std::{
 };
 
 use crate::error::{ballista_error, BallistaError, Result};
-use crate::memory_stream::MemoryStream;
 use crate::serde::protobuf::{self};
 use crate::serde::scheduler::{
     Action, ExecutePartition, ExecutePartitionResult, PartitionId, 
PartitionStats,
diff --git a/ballista/rust/core/src/execution_plans/shuffle_reader.rs 
b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
index 4d401ec..ea3381d 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_reader.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_reader.rs
@@ -20,7 +20,6 @@ use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
 use crate::client::BallistaClient;
-use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::{PartitionLocation, PartitionStats};
 
 use crate::utils::WrappedStream;
diff --git a/ballista/rust/core/src/execution_plans/shuffle_writer.rs 
b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
index 0962615..6a6bc3e 100644
--- a/ballista/rust/core/src/execution_plans/shuffle_writer.rs
+++ b/ballista/rust/core/src/execution_plans/shuffle_writer.rs
@@ -28,7 +28,6 @@ use std::time::Instant;
 use std::{any::Any, pin::Pin};
 
 use crate::error::BallistaError;
-use crate::memory_stream::MemoryStream;
 use crate::utils;
 
 use crate::serde::protobuf::ShuffleWritePartition;
@@ -47,6 +46,7 @@ use datafusion::error::{DataFusionError, Result};
 use datafusion::execution::runtime_env::RuntimeEnv;
 use datafusion::physical_plan::common::IPCWriter;
 use datafusion::physical_plan::hash_utils::create_hashes;
+use datafusion::physical_plan::memory::MemoryStream;
 use datafusion::physical_plan::metrics::{
     self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
 };
diff --git a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs 
b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
index 6de8dba..e14c1eb 100644
--- a/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
+++ b/ballista/rust/core/src/execution_plans/unresolved_shuffle.rs
@@ -18,7 +18,6 @@
 use std::sync::Arc;
 use std::{any::Any, pin::Pin};
 
-use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionLocation;
 
 use async_trait::async_trait;
diff --git a/ballista/rust/core/src/lib.rs b/ballista/rust/core/src/lib.rs
index 4e51067..bc7be4f 100644
--- a/ballista/rust/core/src/lib.rs
+++ b/ballista/rust/core/src/lib.rs
@@ -27,7 +27,6 @@ pub mod client;
 pub mod config;
 pub mod error;
 pub mod execution_plans;
-pub mod memory_stream;
 pub mod utils;
 
 #[macro_use]
diff --git a/ballista/rust/core/src/memory_stream.rs 
b/ballista/rust/core/src/memory_stream.rs
deleted file mode 100644
index 0c0ba4b..0000000
--- a/ballista/rust/core/src/memory_stream.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! This is copied from DataFusion because it is declared as `pub(crate)`. See
-//! https://issues.apache.org/jira/browse/ARROW-11276.
-
-use std::task::{Context, Poll};
-
-use datafusion::arrow::{datatypes::SchemaRef, error::Result, 
record_batch::RecordBatch};
-use datafusion::physical_plan::RecordBatchStream;
-use futures::Stream;
-
-/// Iterator over batches
-
-pub struct MemoryStream {
-    /// Vector of record batches
-    data: Vec<RecordBatch>,
-    /// Schema representing the data
-    schema: SchemaRef,
-    /// Optional projection for which columns to load
-    projection: Option<Vec<usize>>,
-    /// Index into the data
-    index: usize,
-}
-
-impl MemoryStream {
-    /// Create an iterator for a vector of record batches
-
-    pub fn try_new(
-        data: Vec<RecordBatch>,
-        schema: SchemaRef,
-        projection: Option<Vec<usize>>,
-    ) -> Result<Self> {
-        Ok(Self {
-            data,
-            schema,
-            projection,
-            index: 0,
-        })
-    }
-}
-
-impl Stream for MemoryStream {
-    type Item = Result<RecordBatch>;
-
-    fn poll_next(
-        mut self: std::pin::Pin<&mut Self>,
-        _: &mut Context<'_>,
-    ) -> Poll<Option<Self::Item>> {
-        Poll::Ready(if self.index < self.data.len() {
-            self.index += 1;
-
-            let batch = &self.data[self.index - 1];
-
-            // apply projection
-            let next_batch = match &self.projection {
-                Some(projection) => batch.project(projection)?,
-                None => batch.clone(),
-            };
-
-            Some(Ok(next_batch))
-        } else {
-            None
-        })
-    }
-
-    fn size_hint(&self) -> (usize, Option<usize>) {
-        (self.data.len(), Some(self.data.len()))
-    }
-}
-
-impl RecordBatchStream for MemoryStream {
-    /// Get the schema
-
-    fn schema(&self) -> SchemaRef {
-        self.schema.clone()
-    }
-}
diff --git a/ballista/rust/core/src/utils.rs b/ballista/rust/core/src/utils.rs
index e208461..efe175f 100644
--- a/ballista/rust/core/src/utils.rs
+++ b/ballista/rust/core/src/utils.rs
@@ -26,7 +26,6 @@ use crate::error::{BallistaError, Result};
 use crate::execution_plans::{
     DistributedQueryExec, ShuffleWriterExec, UnresolvedShuffleExec,
 };
-use crate::memory_stream::MemoryStream;
 use crate::serde::scheduler::PartitionStats;
 
 use crate::config::BallistaConfig;
diff --git a/datafusion/src/execution/mod.rs b/datafusion/src/execution/mod.rs
index ebc7c01..0c92627 100644
--- a/datafusion/src/execution/mod.rs
+++ b/datafusion/src/execution/mod.rs
@@ -20,6 +20,6 @@
 pub mod context;
 pub mod dataframe_impl;
 pub(crate) mod disk_manager;
-pub(crate) mod memory_manager;
+pub mod memory_manager;
 pub mod options;
 pub mod runtime_env;
diff --git a/datafusion/src/physical_plan/memory.rs 
b/datafusion/src/physical_plan/memory.rs
index 8e32b09..8e5f379 100644
--- a/datafusion/src/physical_plan/memory.rs
+++ b/datafusion/src/physical_plan/memory.rs
@@ -147,7 +147,7 @@ impl MemoryExec {
 }
 
 /// Iterator over batches
-pub(crate) struct MemoryStream {
+pub struct MemoryStream {
     /// Vector of record batches
     data: Vec<RecordBatch>,
     /// Schema representing the data

Reply via email to