eerhardt commented on a change in pull request #12068: URL: https://github.com/apache/arrow/pull/12068#discussion_r780371694
########## File path: csharp/examples/IoTDataPipelineExample/Program.cs ########## @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Collections.Generic; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Apache.Arrow; + +namespace IoTPipelineExample +{ + class Program + { + public static int concurrencyLevel = 8; + public static int totalInputs = 10_000; + public static int queueCapacity = 1_000_000; + + public static async Task Main(string[] args) + { + SensorDataPipeline sdp = new SensorDataPipeline(totalInputs, queueCapacity); + List<Task> tasks = new List<Task>(); + Dictionary<string, List<RecordBatch>> recordBatchDict = new Dictionary<string, List<RecordBatch>>(); + + Console.WriteLine("Producing IoT sensor data..."); + for (int i = 0; i < concurrencyLevel; i++) + { + int j = i; + Task t = Task.Run(() => sdp.WriteToChannel(j)); + tasks.Add(t); + } + + Console.WriteLine("Consuming IoT sensor data..."); + tasks.Add(Task.Run(() => sdp.ReadFromChannel())); + + Console.WriteLine("Waiting for all tasks to complete..."); + Task.WaitAll(tasks.ToArray()); + + Console.WriteLine("Persisting data to disk..."); + var arrowDataPath = await sdp.PersistData(); + //string currentPath = Directory.GetCurrentDirectory(); + //string arrowDataPath = Path.Combine(currentPath, "arrow"); Review comment: Are these necessary? They just seem to be clutter. ```suggestion ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), Review comment: I don't think the comment aligns with the code. `rand.Next(1_001, 1_123)` will only produce 123 different values. ########## File path: csharp/examples/IoTDataPipelineExample/README.md ########## @@ -0,0 +1,29 @@ +# .NET IoT Analytics + +## Introduction + +For Big Data problems, people will find that most of the technology and resources are for Java, Python, R, and Scala. +Unfortunately, it's not the same for C#. But with Apache Arrow, a language-agnostic data format, the gap between Big Data Technology and +.Net Enterprise Software Development can be bridged. + +## The original dataset is: + +[WISDM Smartphone and Smartwatch Activity and Biometrics Dataset Dataset](https://archive.ics.uci.edu/ml/datasets/WISDM+Smartphone+and+Smartwatch+Activity+and+Biometrics+Dataset+): + Contains accelerometer and gyroscope time-series sensor data collected from a smartphone and smartwatch as 51 test subjects perform 18 activities for 3 minutes each. + +## The sample dataset used in this example is randomly generated in order to test arrow in large-scale data scenario + +The sample dataset includes activity data from 1000 participants from an activity recognition project. +Each participant performed each of the 18 activities for a total amount of one billion accelerometer data events +reported from smartphone and smartwatch. + +* Timestamp is the time at which the sensor reported the reading. +* X_Axis is the g-force acceleration along the x-axis. +* Y_Axis is the g-force acceleration along the y-axis. +* Z_Axis is the g-force acceleration along the z-axis. + +## Persist data in Arrow format + +The application has one producer thread produces data and one consumer thread consumes and transforms data, since the data volume is huge, Review comment: According to Program.cs, there are 8 producing threads. ```suggestion The application has eight producer threads producing data and one consumer thread consumes and transforms data, since the data volume is huge, ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), + activityLabel = label, + timestamp = basis.AddMilliseconds(1), + x_Axis = rand.NextDouble(), + y_Axis = rand.NextDouble(), + z_Axis = rand.NextDouble(), + }; + + if (_writer.TryWrite(item)) + { + Interlocked.Increment(ref _size); + + if (_size >= _totalInputs) + { + _writer.TryComplete(); + } + } + } + + Console.WriteLine($"Write to channel task {taskNumber} finished!"); + } + + public async Task ReadFromChannel() + { + + Console.WriteLine($"Read from channel task started!"); + while (await _reader.WaitToReadAsync()) + { + while (_reader.TryRead(out SensorData item)) + { + int subjectId = (int)item.subjectId; + + if (item != null) + { + if (!_colSubjectIdBuilderDict.ContainsKey(subjectId)) + { + _colSubjectIdBuilderDict.Add(subjectId, new Int32Array.Builder()); + _colActivityLabelBuilderDict.Add(subjectId, new StringArray.Builder()); + _colTimestampBuilderDict.Add(subjectId, new TimestampArray.Builder()); + _colXAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colYAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colZAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + } + _colSubjectIdBuilderDict[subjectId].Append((int)item.subjectId); + _colActivityLabelBuilderDict[subjectId].Append(item.activityLabel); + _colTimestampBuilderDict[subjectId].Append((DateTimeOffset)item.timestamp); + _colXAxisBuilderDict[subjectId].Append((double)item.x_Axis); + _colYAxisBuilderDict[subjectId].Append((double)item.y_Axis); + _colZAxisBuilderDict[subjectId].Append((double)item.z_Axis); + } + } + } + Console.WriteLine($"Read from channel task finished!"); + } + + public async Task<string> PersistData() + { + List<RecordBatch> recordBatches = new List<RecordBatch>(); + + string currentPath = Directory.GetCurrentDirectory(); + string arrowDataPath = Path.Combine(currentPath, "arrow"); + if (!Directory.Exists(arrowDataPath)) + Directory.CreateDirectory(arrowDataPath); + + var memoryAllocator = new NativeMemoryAllocator(alignment: 64); + + foreach (var key in _colSubjectIdBuilderDict.Keys) + { + var subjectId = key; + + //var recordBatch = new RecordBatch.Builder(memoryAllocator) + //.Append("SubjectId", false, _colSubjectIdBuilderDict[subjectId].Build()) + //.Append("ActivityLabel", false, _colActivityLabelBuilderDict[subjectId].Build()) + //.Append("Timestamp", false, _colTimestampBuilderDict[subjectId].Build()) + //.Append("XAxis", false, _colXAxisBuilderDict[subjectId].Build()) + //.Append("YAxis", false, _colYAxisBuilderDict[subjectId].Build()) + //.Append("ZAxis", false, _colZAxisBuilderDict[subjectId].Build()) + //.Build(); Review comment: Is this needed? ```suggestion ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), + activityLabel = label, + timestamp = basis.AddMilliseconds(1), Review comment: Every timestamp in this channel is going to get the same timestamp, right? That seems wrong. Maybe this should just be: ```suggestion timestamp = DateTimeOffset.UtcNow; ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorData.cs ########## @@ -0,0 +1,32 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.Collections.Generic; +using System.Text; + +namespace IoTPipelineExample +{ + public class SensorData + { + //Subject-id, Activity Label, Timestamp, x, y, z + public int? subjectId { get; set; } + public string? activityLabel { get; set; } + public DateTimeOffset? timestamp { get; set; } + public double? x_Axis { get; set; } + public double? y_Axis { get; set; } + public double? z_Axis { get; set; } Review comment: Can these names follow standard .NET naming? ```suggestion public int? SubjectId { get; set; } public string? ActivityLabel { get; set; } public DateTimeOffset? Timestamp { get; set; } public double? X_Axis { get; set; } public double? Y_Axis { get; set; } public double? Z_Axis { get; set; } ``` ########## File path: csharp/examples/IoTDataPipelineExample/Program.cs ########## @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Collections.Generic; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Apache.Arrow; + +namespace IoTPipelineExample +{ + class Program + { + public static int concurrencyLevel = 8; + public static int totalInputs = 10_000; + public static int queueCapacity = 1_000_000; Review comment: These values are a bit confusing. If we are only going to create 10,000 inputs, why would we need a queue capacity of 1 million? ########## File path: csharp/examples/IoTDataPipelineExample/Program.cs ########## @@ -0,0 +1,107 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Collections.Generic; +using System.Threading.Tasks; +using Apache.Arrow.Ipc; +using Apache.Arrow; + +namespace IoTPipelineExample +{ + class Program + { + public static int concurrencyLevel = 8; + public static int totalInputs = 10_000; + public static int queueCapacity = 1_000_000; + + public static async Task Main(string[] args) + { + SensorDataPipeline sdp = new SensorDataPipeline(totalInputs, queueCapacity); + List<Task> tasks = new List<Task>(); + Dictionary<string, List<RecordBatch>> recordBatchDict = new Dictionary<string, List<RecordBatch>>(); + + Console.WriteLine("Producing IoT sensor data..."); + for (int i = 0; i < concurrencyLevel; i++) + { + int j = i; + Task t = Task.Run(() => sdp.WriteToChannel(j)); + tasks.Add(t); + } + + Console.WriteLine("Consuming IoT sensor data..."); + tasks.Add(Task.Run(() => sdp.ReadFromChannel())); + + Console.WriteLine("Waiting for all tasks to complete..."); + Task.WaitAll(tasks.ToArray()); + + Console.WriteLine("Persisting data to disk..."); + var arrowDataPath = await sdp.PersistData(); + //string currentPath = Directory.GetCurrentDirectory(); + //string arrowDataPath = Path.Combine(currentPath, "arrow"); + Console.WriteLine("Loading arrow data file into memory..."); + string[] fileEntries = Directory.GetFiles(arrowDataPath); + + foreach (string fileEntry in fileEntries) + { + Console.WriteLine($"Reading data from arrow file {Path.GetFileName(fileEntry)}..."); + + using (var stream = File.OpenRead(fileEntry)) + using (var reader = new ArrowFileReader(stream)) + { + try + { + int count = await reader.RecordBatchCountAsync(); + + for (int i = 0; i < count; i++) + { + var recordBatch = await reader.ReadRecordBatchAsync(i); + + //for (int j = 0; j < recordBatch.ColumnCount; j++) + //{ + // Console.WriteLine($"RecordBatch {i.ToString().PadLeft(6)} Column {j} Length is: " + // + recordBatch.Column(j).Data.Length.ToString().PadLeft(6) + // + " NULL Count is: " + // + recordBatch.Column(j).Data.NullCount); + //} Review comment: Can this be uncommented? If not, should it be removed? ```suggestion for (int j = 0; j < recordBatch.ColumnCount; j++) { Console.WriteLine($"RecordBatch {i.ToString().PadLeft(6)} Column {j} Length is: " + recordBatch.Column(j).Data.Length.ToString().PadLeft(6) + " NULL Count is: " + recordBatch.Column(j).Data.NullCount); } ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), + activityLabel = label, + timestamp = basis.AddMilliseconds(1), + x_Axis = rand.NextDouble(), + y_Axis = rand.NextDouble(), + z_Axis = rand.NextDouble(), + }; + + if (_writer.TryWrite(item)) + { + Interlocked.Increment(ref _size); + + if (_size >= _totalInputs) + { + _writer.TryComplete(); + } + } + } + + Console.WriteLine($"Write to channel task {taskNumber} finished!"); + } + + public async Task ReadFromChannel() + { + + Console.WriteLine($"Read from channel task started!"); + while (await _reader.WaitToReadAsync()) + { + while (_reader.TryRead(out SensorData item)) + { + int subjectId = (int)item.subjectId; + + if (item != null) + { + if (!_colSubjectIdBuilderDict.ContainsKey(subjectId)) + { + _colSubjectIdBuilderDict.Add(subjectId, new Int32Array.Builder()); + _colActivityLabelBuilderDict.Add(subjectId, new StringArray.Builder()); + _colTimestampBuilderDict.Add(subjectId, new TimestampArray.Builder()); + _colXAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colYAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colZAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + } + _colSubjectIdBuilderDict[subjectId].Append((int)item.subjectId); + _colActivityLabelBuilderDict[subjectId].Append(item.activityLabel); + _colTimestampBuilderDict[subjectId].Append((DateTimeOffset)item.timestamp); + _colXAxisBuilderDict[subjectId].Append((double)item.x_Axis); + _colYAxisBuilderDict[subjectId].Append((double)item.y_Axis); + _colZAxisBuilderDict[subjectId].Append((double)item.z_Axis); + } + } + } + Console.WriteLine($"Read from channel task finished!"); + } + + public async Task<string> PersistData() + { + List<RecordBatch> recordBatches = new List<RecordBatch>(); + + string currentPath = Directory.GetCurrentDirectory(); + string arrowDataPath = Path.Combine(currentPath, "arrow"); + if (!Directory.Exists(arrowDataPath)) + Directory.CreateDirectory(arrowDataPath); + + var memoryAllocator = new NativeMemoryAllocator(alignment: 64); + + foreach (var key in _colSubjectIdBuilderDict.Keys) + { + var subjectId = key; + + //var recordBatch = new RecordBatch.Builder(memoryAllocator) + //.Append("SubjectId", false, _colSubjectIdBuilderDict[subjectId].Build()) + //.Append("ActivityLabel", false, _colActivityLabelBuilderDict[subjectId].Build()) + //.Append("Timestamp", false, _colTimestampBuilderDict[subjectId].Build()) + //.Append("XAxis", false, _colXAxisBuilderDict[subjectId].Build()) + //.Append("YAxis", false, _colYAxisBuilderDict[subjectId].Build()) + //.Append("ZAxis", false, _colZAxisBuilderDict[subjectId].Build()) + //.Build(); + Schema.Builder schemaBuilder = new Schema.Builder(); + + schemaBuilder.Field(new Field("SubjectId", Int32Type.Default, nullable: false)); + schemaBuilder.Field(new Field("ActivityLabel", StringType.Default, nullable: false)); + schemaBuilder.Field(new Field("Timestamp", Int64Type.Default, nullable: false)); Review comment: I think this should use the `TimestampType`. ```suggestion schemaBuilder.Field(new Field("Timestamp", TimestampType.Default, nullable: false)); ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), + activityLabel = label, + timestamp = basis.AddMilliseconds(1), + x_Axis = rand.NextDouble(), + y_Axis = rand.NextDouble(), + z_Axis = rand.NextDouble(), + }; + + if (_writer.TryWrite(item)) + { + Interlocked.Increment(ref _size); + + if (_size >= _totalInputs) + { + _writer.TryComplete(); + } + } + } + + Console.WriteLine($"Write to channel task {taskNumber} finished!"); + } + + public async Task ReadFromChannel() + { + + Console.WriteLine($"Read from channel task started!"); + while (await _reader.WaitToReadAsync()) + { + while (_reader.TryRead(out SensorData item)) + { + int subjectId = (int)item.subjectId; + + if (item != null) + { Review comment: We should be checking for `null` before dereferencing `item`. ```suggestion if (item != null) { int subjectId = (int)item.subjectId; ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; Review comment: I don't think we need this field. The parameter is just used to init the Channel. No reason to hold on to this int. ```suggestion ``` ########## File path: csharp/examples/IoTDataPipelineExample/Model/SensorDataPipeline.cs ########## @@ -0,0 +1,225 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using System; +using System.IO; +using System.Threading.Tasks; +using System.Collections.Generic; +using Apache.Arrow; +using Apache.Arrow.Ipc; +using Apache.Arrow.Memory; +using System.Threading.Channels; +using System.Threading; +using Apache.Arrow.Types; + +namespace IoTPipelineExample +{ + public class SensorDataPipeline + { + private int _size; + private readonly int _totalInputs; + private readonly int _queueCapacity; + private readonly Channel<SensorData> _channel; + ChannelWriter<SensorData> _writer; + ChannelReader<SensorData> _reader; + + private readonly Dictionary<int, Int32Array.Builder> _colSubjectIdBuilderDict; + private readonly Dictionary<int, StringArray.Builder> _colActivityLabelBuilderDict; + private readonly Dictionary<int, TimestampArray.Builder> _colTimestampBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colXAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colYAxisBuilderDict; + private readonly Dictionary<int, DoubleArray.Builder> _colZAxisBuilderDict; + + public Dictionary<string, string> activityLabel = new Dictionary<string, string>() + { + {"walking", "A"}, + {"jogging", "B"}, + {"stairs", "C"}, + {"sitting", "D"}, + {"standing", "E"}, + {"typing", "F"}, + {"teeth", "G"}, + {"soup", "H"}, + {"chips", "I"}, + {"pasta", "J"}, + {"drinking", "K"}, + {"sandwich", "L"}, + {"kicking", "M"}, + {"catch", "O"}, + {"dribbling", "P"}, + {"writing", "Q"}, + {"clapping", "R"}, + {"folding", "S"}, + }; + + public SensorDataPipeline(int totalInputs, int queueCapacity) + { + _totalInputs = totalInputs; + _queueCapacity = queueCapacity; + _channel = Channel.CreateBounded<SensorData>(_queueCapacity); + _writer = _channel.Writer; + _reader = _channel.Reader; + _colSubjectIdBuilderDict = new Dictionary<int, Int32Array.Builder>(); + _colActivityLabelBuilderDict = new Dictionary<int, StringArray.Builder>(); + _colTimestampBuilderDict = new Dictionary<int, TimestampArray.Builder>(); + _colXAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colYAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + _colZAxisBuilderDict = new Dictionary<int, DoubleArray.Builder>(); + } + + public async Task WriteToChannel(int taskNumber) + { + Random rand = new Random(); + List<string> keyList = new List<string>(activityLabel.Keys); + int count = keyList.Count; + var basis = DateTimeOffset.UtcNow; + + Console.WriteLine($"Write to channel task {taskNumber} started!"); + while (await _writer.WaitToWriteAsync()) + { + string randomKey = keyList[rand.Next(count)]; + string label = activityLabel[randomKey]; + + // generate random missing values + if (rand.Next(10_000) == 9_999) + { + label = null; + } + + var item = new SensorData + { + // approximately 9_000 unique subjects/sensors + subjectId = rand.Next(1_001, 1_123), + activityLabel = label, + timestamp = basis.AddMilliseconds(1), + x_Axis = rand.NextDouble(), + y_Axis = rand.NextDouble(), + z_Axis = rand.NextDouble(), + }; + + if (_writer.TryWrite(item)) + { + Interlocked.Increment(ref _size); + + if (_size >= _totalInputs) + { + _writer.TryComplete(); + } + } + } + + Console.WriteLine($"Write to channel task {taskNumber} finished!"); + } + + public async Task ReadFromChannel() + { + + Console.WriteLine($"Read from channel task started!"); + while (await _reader.WaitToReadAsync()) + { + while (_reader.TryRead(out SensorData item)) + { + int subjectId = (int)item.subjectId; + + if (item != null) + { + if (!_colSubjectIdBuilderDict.ContainsKey(subjectId)) + { + _colSubjectIdBuilderDict.Add(subjectId, new Int32Array.Builder()); + _colActivityLabelBuilderDict.Add(subjectId, new StringArray.Builder()); + _colTimestampBuilderDict.Add(subjectId, new TimestampArray.Builder()); + _colXAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colYAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + _colZAxisBuilderDict.Add(subjectId, new DoubleArray.Builder()); + } + _colSubjectIdBuilderDict[subjectId].Append((int)item.subjectId); + _colActivityLabelBuilderDict[subjectId].Append(item.activityLabel); + _colTimestampBuilderDict[subjectId].Append((DateTimeOffset)item.timestamp); + _colXAxisBuilderDict[subjectId].Append((double)item.x_Axis); + _colYAxisBuilderDict[subjectId].Append((double)item.y_Axis); + _colZAxisBuilderDict[subjectId].Append((double)item.z_Axis); + } + } + } + Console.WriteLine($"Read from channel task finished!"); + } + + public async Task<string> PersistData() + { + List<RecordBatch> recordBatches = new List<RecordBatch>(); + + string currentPath = Directory.GetCurrentDirectory(); + string arrowDataPath = Path.Combine(currentPath, "arrow"); + if (!Directory.Exists(arrowDataPath)) + Directory.CreateDirectory(arrowDataPath); + + var memoryAllocator = new NativeMemoryAllocator(alignment: 64); Review comment: This doesn't appear to be used. ```suggestion ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org