eerhardt commented on a change in pull request #11874:
URL: https://github.com/apache/arrow/pull/11874#discussion_r771593521
##########
File path: csharp/examples/IoTExample/IoTExample.csproj
##########
@@ -0,0 +1,16 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+ <PropertyGroup>
+ <OutputType>Exe</OutputType>
+ <TargetFramework>netcoreapp2.1</TargetFramework>
Review comment:
```suggestion
<TargetFramework>netcoreapp3.1</TargetFramework>
```
2.1 is EOL'd. Can you update this to 3.1?
##########
File path: csharp/examples/IoTExample/README.md
##########
@@ -0,0 +1,29 @@
+# .NET IoT Analytics
+
+## Introduction
+
+For Big Data problems, people will find that most of the technology and
resources are for Java, Python, R, and Scala.
+Unfortunately, it's not the same for C#. But with Apache Arrow, a
language-agnostic data format, the gap between Big Data Technology and
+.Net Enterprise Software Development can be bridged.
+
+## The original dataset is:
+
+[WISDM Smartphone and Smartwatch Activity and Biometrics Dataset
Dataset](https://archive.ics.uci.edu/ml/datasets/WISDM+Smartphone+and+Smartwatch+Activity+and+Biometrics+Dataset+):
+ Contains accelerometer and gyroscope time-series sensor data collected from
a smartphone and smartwatch as 51 test subjects perform 18 activities for 3
minutes each.
+
+## The sample dataset used in this example is randomly generated in order to
test arrow in large-scale data scenario
+
+The sample dataset includes activity data from 1000 participants from an
activity recognition project.
+Each participant performed each of the 18 activities for a total amount of one
billion accelerometer data events
+reported from smartphone and smartwatch.
+
+Timestamp is the time at which the sensor reported the reading.
+X_Axis is the g-force acceleration along the x-axis.
+Y_Axis is the g-force acceleration along the y-axis.
+Z_Axis is the g-force acceleration along the z-axis.
Review comment:
```suggestion
* Timestamp is the time at which the sensor reported the reading.
* X_Axis is the g-force acceleration along the x-axis.
* Y_Axis is the g-force acceleration along the y-axis.
* Z_Axis is the g-force acceleration along the z-axis.
```
It may make this easier to read with bulleting.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
+ _size++;
+ }
+ else
+ {
+ Console.WriteLine("Producing is blocked, percent completed
is: {0}%", Math.Round((double)_size / _inputs, 4) * 100);
+ }
+ }
+
+ _rows.CompleteAdding();
+ }
+
+ public async void Consume()
+ {
+ while (!_rows.IsCompleted)
+ {
+ if (!_rows.TryTake(out SensorData item, 3000))
+ {
+ Console.WriteLine("Consuming is blocked!");
+ }
+ else
+ {
+ //Console.WriteLine($"Dequeue Task 1");
+ if (item != null && item.subjectId != null)
+ {
+ _colSubjectId.Add((int)item.subjectId);
+ _colActivityLabel.Add(item.activityLabel);
+ _colTimestamp.Add((long)item.timestamp);
+ _colXAxis.Add((double)item.x_Axis);
+ _colYAxis.Add((double)item.y_Axis);
+ _colZAxis.Add((double)item.z_Axis);
+ }
+ }
+
+ if (_colSubjectId.Count == _threshold)
+ {
+ // Build a record batch using the Fluent API
+ var recordBatch = new RecordBatch.Builder(_memoryAllocator)
+ .Append("SubjectId", false, col => col.Int32(array =>
array.AppendRange(_colSubjectId)))
+ .Append("ActivityLabel", false, col =>
col.String(array => array.AppendRange(_colActivityLabel)))
+ .Append("Timestamp", false, col => col.Int64(array =>
array.AppendRange(_colTimestamp)))
+ .Append("XAxis", false, col => col.Double(array =>
array.AppendRange(_colXAxis)))
+ .Append("YAxis", false, col => col.Double(array =>
array.AppendRange(_colYAxis)))
+ .Append("ZAxis", false, col => col.Double(array =>
array.AppendRange(_colZAxis)))
+ .Build();
+
+ _recordBatches.Add(recordBatch);
+
+ _colSubjectId.Clear();
+ _colActivityLabel.Clear();
+ _colTimestamp.Clear();
+ _colXAxis.Clear();
+ _colYAxis.Clear();
+ _colZAxis.Clear();
+ }
+ }
+ }
+
+ public async Task<bool> PersistData()
+ {
+ string time = DateTime.Now.ToString("yyyyMMdd_HHmmss");
+ Stopwatch stopwatch = new Stopwatch();
+ stopwatch.Start();
+
+ Schema schema = _recordBatches[0].Schema;
+ // Write record batch to a file
+ using (var stream =
File.OpenWrite(@"c:\temp\data\iotbigdata.arrow"))
Review comment:
If this path doesn't exist, you get an error at the end of the run:
```
Producing is blocked, percent completed is: 90%
Unhandled exception. System.IO.DirectoryNotFoundException: Could not find a
part of the path 'c:\temp\data\iotbigdata.arrow'.
at System.IO.FileStream.ValidateFileHandle(SafeFileHandle fileHandle)
at System.IO.FileStream.CreateFileOpenHandle(FileMode mode, FileShare
share, FileOptions options)
at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess
access, FileShare share, Int32 bufferSize, FileOptions options)
at System.IO.FileStream..ctor(String path, FileMode mode, FileAccess
access, FileShare share)
at System.IO.File.OpenWrite(String path)
at IoTExample.SampleDataset.PersistData() in
C:\git\arrow\csharp\examples\IoTExample\Model\SampleDataset.cs:line 185
at IoTExample.Program.Main(String[] args) in
C:\git\arrow\csharp\examples\IoTExample\Program.cs:line 42
at IoTExample.Program.<Main>(String[] args)
```
Maybe just write the file to the current directory?
##########
File path: csharp/examples/IoTExample/Program.cs
##########
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using Apache.Arrow.Ipc;
+
+namespace IoTExample
+{
+ class Program
+ {
+ public static int concurrencyLevel = 1;
+ public static int totalInputs = 10_000_000;
+ public static int queueCapacity = 1_000_000;
+
+ public static async Task Main(string[] args)
+ {
+ SampleDataset sd = new SampleDataset(totalInputs, queueCapacity);
+
+ Console.WriteLine("Receiving IoT data...");
+ Task t1 = Task.Run(() => sd.Produce());
+
+ Console.WriteLine("Transforming data...");
+ Task t2 = Task.Run(() => sd.Consume());
+
+ // Wait for all tasks to complete
+ Task.WaitAll(t1, t2);
+
+ var success = await sd.PersistData();
+
+ if (!success)
+ return;
+
+ Console.WriteLine("Reading arrow files...");
+ var stream = File.OpenRead(@"c:\temp\data\iotbigdata.arrow");
Review comment:
Since this path needs to be in sync with `PersistData`, it may make
sense to declare it in one spot, and pass it into `SampleDataset`.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
+ _size++;
+ }
+ else
+ {
+ Console.WriteLine("Producing is blocked, percent completed
is: {0}%", Math.Round((double)_size / _inputs, 4) * 100);
+ }
+ }
+
+ _rows.CompleteAdding();
+ }
+
+ public async void Consume()
+ {
+ while (!_rows.IsCompleted)
+ {
+ if (!_rows.TryTake(out SensorData item, 3000))
+ {
+ Console.WriteLine("Consuming is blocked!");
+ }
+ else
+ {
+ //Console.WriteLine($"Dequeue Task 1");
+ if (item != null && item.subjectId != null)
+ {
+ _colSubjectId.Add((int)item.subjectId);
+ _colActivityLabel.Add(item.activityLabel);
+ _colTimestamp.Add((long)item.timestamp);
+ _colXAxis.Add((double)item.x_Axis);
+ _colYAxis.Add((double)item.y_Axis);
+ _colZAxis.Add((double)item.z_Axis);
+ }
+ }
+
+ if (_colSubjectId.Count == _threshold)
+ {
+ // Build a record batch using the Fluent API
+ var recordBatch = new RecordBatch.Builder(_memoryAllocator)
+ .Append("SubjectId", false, col => col.Int32(array =>
array.AppendRange(_colSubjectId)))
+ .Append("ActivityLabel", false, col =>
col.String(array => array.AppendRange(_colActivityLabel)))
+ .Append("Timestamp", false, col => col.Int64(array =>
array.AppendRange(_colTimestamp)))
+ .Append("XAxis", false, col => col.Double(array =>
array.AppendRange(_colXAxis)))
+ .Append("YAxis", false, col => col.Double(array =>
array.AppendRange(_colYAxis)))
+ .Append("ZAxis", false, col => col.Double(array =>
array.AppendRange(_colZAxis)))
+ .Build();
+
+ _recordBatches.Add(recordBatch);
+
+ _colSubjectId.Clear();
+ _colActivityLabel.Clear();
+ _colTimestamp.Clear();
+ _colXAxis.Clear();
+ _colYAxis.Clear();
+ _colZAxis.Clear();
+ }
+ }
+ }
+
+ public async Task<bool> PersistData()
Review comment:
```suggestion
public async Task PersistData()
```
This method never returns `false`. So you can just change it to return
`Task` instead.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
+ _size++;
+ }
+ else
+ {
+ Console.WriteLine("Producing is blocked, percent completed
is: {0}%", Math.Round((double)_size / _inputs, 4) * 100);
+ }
+ }
+
+ _rows.CompleteAdding();
+ }
+
+ public async void Consume()
+ {
+ while (!_rows.IsCompleted)
+ {
+ if (!_rows.TryTake(out SensorData item, 3000))
+ {
+ Console.WriteLine("Consuming is blocked!");
+ }
+ else
+ {
+ //Console.WriteLine($"Dequeue Task 1");
Review comment:
```suggestion
```
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
+ _size++;
+ }
+ else
+ {
+ Console.WriteLine("Producing is blocked, percent completed
is: {0}%", Math.Round((double)_size / _inputs, 4) * 100);
+ }
+ }
+
+ _rows.CompleteAdding();
+ }
+
+ public async void Consume()
+ {
+ while (!_rows.IsCompleted)
+ {
+ if (!_rows.TryTake(out SensorData item, 3000))
+ {
+ Console.WriteLine("Consuming is blocked!");
+ }
+ else
+ {
+ //Console.WriteLine($"Dequeue Task 1");
+ if (item != null && item.subjectId != null)
+ {
+ _colSubjectId.Add((int)item.subjectId);
+ _colActivityLabel.Add(item.activityLabel);
+ _colTimestamp.Add((long)item.timestamp);
+ _colXAxis.Add((double)item.x_Axis);
+ _colYAxis.Add((double)item.y_Axis);
+ _colZAxis.Add((double)item.z_Axis);
+ }
+ }
+
+ if (_colSubjectId.Count == _threshold)
+ {
+ // Build a record batch using the Fluent API
+ var recordBatch = new RecordBatch.Builder(_memoryAllocator)
+ .Append("SubjectId", false, col => col.Int32(array =>
array.AppendRange(_colSubjectId)))
+ .Append("ActivityLabel", false, col =>
col.String(array => array.AppendRange(_colActivityLabel)))
+ .Append("Timestamp", false, col => col.Int64(array =>
array.AppendRange(_colTimestamp)))
+ .Append("XAxis", false, col => col.Double(array =>
array.AppendRange(_colXAxis)))
+ .Append("YAxis", false, col => col.Double(array =>
array.AppendRange(_colYAxis)))
+ .Append("ZAxis", false, col => col.Double(array =>
array.AppendRange(_colZAxis)))
+ .Build();
+
+ _recordBatches.Add(recordBatch);
+
+ _colSubjectId.Clear();
+ _colActivityLabel.Clear();
+ _colTimestamp.Clear();
+ _colXAxis.Clear();
+ _colYAxis.Clear();
+ _colZAxis.Clear();
+ }
+ }
+ }
+
+ public async Task<bool> PersistData()
+ {
+ string time = DateTime.Now.ToString("yyyyMMdd_HHmmss");
Review comment:
```suggestion
```
This variable doesn't appear to be used.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
+ _size++;
+ }
+ else
+ {
+ Console.WriteLine("Producing is blocked, percent completed
is: {0}%", Math.Round((double)_size / _inputs, 4) * 100);
+ }
+ }
+
+ _rows.CompleteAdding();
+ }
+
+ public async void Consume()
Review comment:
This method doesn't need to be `async` since it doesn't `await` anything.
Also - avoid `async void` methods in general. Always return `async Task`, if
you can.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
Review comment:
Maybe since we are using `async` methods, can we change this to use
[Channels](https://devblogs.microsoft.com/dotnet/an-introduction-to-system-threading-channels/)
instead? They are very similar, but Channels was made with `async` in mind.
##########
File path: csharp/examples/IoTExample/Model/SampleDataset.cs
##########
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+using System;
+using System.IO;
+using System.Threading.Tasks;
+using System.Diagnostics;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using Apache.Arrow;
+using Apache.Arrow.Ipc;
+using Apache.Arrow.Memory;
+
+namespace IoTExample
+{
+ public class SampleDataset
+ {
+ private int _size;
+ private readonly int _inputs;
+ private readonly int _capacity;
+ private readonly BlockingCollection<SensorData> _rows;
+
+ private readonly List<int> _colSubjectId;
+ private readonly List<string> _colActivityLabel;
+ private readonly List<long> _colTimestamp;
+ private readonly List<double> _colXAxis;
+ private readonly List<double> _colYAxis;
+ private readonly List<double> _colZAxis;
+
+ private readonly int _threshold = 1_000_000;
+ private readonly List<RecordBatch> _recordBatches;
+ private readonly MemoryAllocator _memoryAllocator;
+
+ public Dictionary<string, string> activityLabel = new
Dictionary<string, string>()
+ {
+ {"walking", "A"},
+ {"jogging", "B"},
+ {"stairs", "C"},
+ {"sitting", "D"},
+ {"standing", "E"},
+ {"typing", "F"},
+ {"teeth", "G"},
+ {"soup", "H"},
+ {"chips", "I"},
+ {"pasta", "J"},
+ {"drinking", "K"},
+ {"sandwich", "L"},
+ {"kicking", "M"},
+ {"catch", "O"},
+ {"dribbling", "P"},
+ {"writing", "Q"},
+ {"clapping", "R"},
+ {"folding", "S"},
+ };
+
+ public SampleDataset(int inputs, int capacity)
+ {
+ _inputs = inputs;
+ _capacity = capacity;
+ _rows = new BlockingCollection<SensorData>(_capacity);
+
+ _colSubjectId = new List<int>(capacity);
+ _colActivityLabel = new List<string>();
+ _colTimestamp = new List<long>();
+ _colXAxis = new List<double>();
+ _colYAxis = new List<double>();
+ _colZAxis = new List<double>();
+
+ _recordBatches = new List<RecordBatch>();
+ _memoryAllocator = new NativeMemoryAllocator(alignment: 64);
+ }
+
+ public void Produce()
+ {
+ Random rand = new Random();
+ bool success;
+
+ List<string> keyList = new List<string>(activityLabel.Keys);
+ int count = keyList.Count;
+
+ DateTime now = DateTime.Now;
+ long unixTime = ((DateTimeOffset)now).ToUnixTimeSeconds();
+
+ while (_size < _inputs)
+ {
+ string randomKey = keyList[rand.Next(count)];
+ string label = activityLabel[randomKey];
+
+ // generate missing values
+ if (rand.Next(10_000) == 9_999)
+ {
+ label = null;
+ }
+
+ success = _rows.TryAdd(new SensorData
+ {
+ subjectId = rand.Next(1000, 2001),
+ activityLabel = label,
+ timestamp = unixTime++,
+ x_Axis = rand.NextDouble(),
+ y_Axis = rand.NextDouble(),
+ z_Axis = rand.NextDouble(),
+ });
+
+ if (success)
+ {
+ //Console.WriteLine($"Enqueue Task 0");
Review comment:
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]