lquerel commented on code in PR #369:
URL: https://github.com/apache/arrow-site/pull/369#discussion_r1234254373


##########
_posts/2023-06-15-our-journey-at-f5-with-apache-arrow-part-2.md:
##########
@@ -0,0 +1,199 @@
+---
+layout: post
+title: "Our journey at F5 with Apache Arrow (part 2)"
+date: "2023-06-15 00:00:00"
+author: Laurent Quérel
+categories: [application]
+---
+<!--
+{% comment %}
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+{% endcomment %}
+-->
+
+In the previous 
[article](https://arrow.apache.org/blog/2023/04/11/our-journey-at-f5-with-apache-arrow-part-1/),
 we discussed our use of Apache Arrow within the context of the OpenTelemetry 
project. We investigated various techniques to maximize the efficiency of 
Apache Arrow, aiming to find the optimal balance between data compression ratio 
and queryability. The compression results speak for themselves, boasting 
improvements ranging from 1.5x to 5x better than the original OTLP protocol. In 
this article, we will delve into three techniques that have enabled us to 
enhance both the compression ratio and memory usage of Apache Arrow buffers 
within the current version of the [OTel Arrow 
protocol](https://github.com/f5/otel-arrow-adapter).
+
+The first technique we'll discuss aims to optimize schemas in terms of memory 
usage. As you'll see, the gains can be substantial, potentially halving memory 
usage in certain cases. The second section will delve more deeply into the 
various approaches that can be used to handle recursive schema definitions. 
Lastly, we'll emphasize that the design of your schema(s), coupled with the 
sorts you can apply at the record level, play a pivotal role in maximizing the 
benefits of Apache Arrow and its columnar representation.
+
+## Handling dynamic and unknown data distributions
+
+In certain contexts, the comprehensive definition of an Arrow schema can end 
up being overly broad and complex in order to cover all possible cases that you 
intend to represent in columnar form. However, as is often the case with 
complex schemas, only a subset of this schema will actually be utilized for a 
specific deployment. Similarly, it's not always possible to determine the 
optimal dictionary encoding for one or more fields in advance. Employing a 
broad and very general schema that covers all cases is usually more 
memory-intensive. This is because, for most implementations, a column without 
value still continues to consume memory space. Likewise, a column with 
dictionary encoding that indexes a uint64 will occupy four times more memory 
than the same column with a dictionary encoding based on a uint8.
+
+To illustrate this more concretely, let's consider an OTel collector 
positioned at the output of a production environment, receiving a telemetry 
data stream produced by a large and dynamic set of servers. Invariably, the 
content of this telemetry stream will change in volume and nature over time. 
It's challenging to predict the optimal schema in such a scenario, and it's 
equally difficult to know in advance the distribution of a particular attribute 
of the telemetry data passing through this point.
+
+To optimize such scenarios, we have adopted an intermediary approach that we 
have named **dynamic Arrow schema**, aiming to gradually adapt the schema based 
on the observed data. The general principle is relatively simple. We start with 
a general schema defining the maximum envelope of what should be represented. 
Some fields of this schema will be declared optional, while other fields will 
be encoded with multiple possible options depending on the observed 
distribution. In theory, this principle can be applied to other types of 
transformations (e.g., recursive column creation) but we will let your 
imagination explore these other options. So if you encounter data streams where 
certain fields are not utilized, some union variants remain unused, and/or the 
value distribution of a field cannot be determined a priori, it may be 
worthwhile to invest time in implementing this model. This can lead to improved 
efficiency in terms of compression ratio, memory usage, and processing speed.
+
+The following Go Arrow schema definition provides an example of such a schema, 
instrumented with a collection of annotations. These annotations will be 
processed by an enhanced Record Builder, equipped with the ability to 
dynamically adapt the schema. The structure of this system is illustrated in 
Figure 1.
+
+```go
+var (
+  // Arrow schema for the OTLP Arrow Traces record (without attributes, links, 
and events).
+  TracesSchema = arrow.NewSchema([]arrow.Field{
+      {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
+      {Name: constants.Resource, Type: arrow.StructOf([]arrow.Field{
+        {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: 
true},
+        {Name: constants.SchemaUrl,Type: arrow.BinaryTypes.String,Metadata: 
schema.Metadata(schema.Dictionary8), Nullable: true},
+        {Name: constants.DroppedAttributesCount,Type: 
arrow.PrimitiveTypes.Uint32,Nullable: true},
+      }...), Nullable: true},
+      {Name: constants.Scope, Type: arrow.StructOf([]arrow.Field{
+          {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Metadata: 
acommon.Metadata(acommon.DeltaEncoding), Nullable: true},
+          {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: 
acommon.Metadata(acommon.Dictionary8), Nullable: true},
+          {Name: constants.Version, Type: arrow.BinaryTypes.String, Metadata: 
acommon.Metadata(acommon.Dictionary8), Nullable: true},
+          {Name: constants.DroppedAttributesCount, Type: 
arrow.PrimitiveTypes.Uint32, Nullable: true},
+      }...), Nullable: true},
+      {Name: constants.SchemaUrl, Type: arrow.BinaryTypes.String, Metadata: 
schema.Metadata(schema.Dictionary8), Nullable: true},
+      {Name: constants.StartTimeUnixNano, Type: 
arrow.FixedWidthTypes.Timestamp_ns},
+      {Name: constants.DurationTimeUnixNano, Type: 
arrow.FixedWidthTypes.Duration_ms, Metadata: 
schema.Metadata(schema.Dictionary8)},
+      {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 
16}},
+      {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
+      {Name: constants.TraceState, Type: arrow.BinaryTypes.String, Metadata: 
schema.Metadata(schema.Dictionary8), Nullable: true},
+      {Name: constants.ParentSpanId, Type: 
&arrow.FixedSizeBinaryType{ByteWidth: 8}, Nullable: true},
+      {Name: constants.Name, Type: arrow.BinaryTypes.String, Metadata: 
schema.Metadata(schema.Dictionary8)},
+      {Name: constants.KIND, Type: arrow.PrimitiveTypes.Int32, Metadata: 
schema.Metadata(schema.Dictionary8), Nullable: true},
+      {Name: constants.DroppedAttributesCount, Type: 
arrow.PrimitiveTypes.Uint32, Nullable: true},
+      {Name: constants.DroppedEventsCount, Type: arrow.PrimitiveTypes.Uint32, 
Nullable: true},
+      {Name: constants.DroppedLinksCount, Type: arrow.PrimitiveTypes.Uint32, 
Nullable: true},
+      {Name: constants.Status, Type: arrow.StructOf([]arrow.Field{
+        {Name: constants.StatusCode, Type: arrow.PrimitiveTypes.Int32, 
Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
+        {Name: constants.StatusMessage, Type: arrow.BinaryTypes.String, 
Metadata: schema.Metadata(schema.Dictionary8), Nullable: true},
+      }...), Nullable: true},
+    }, nil)
+  )
+```
+
+In this example, Arrow field-level metadata are employed to designate when a 
field is optional (Nullable: true) or to specify the minimal dictionary 
encoding applicable to a particular field (Metadata Dictionary8/16/…). Now 
let’s imagine a scenario utilizing this schema in a straightforward scenario, 
wherein only a handful of fields are actually in use, and the cardinality of 
most dictionary-encoded fields is low (i.e., below 2^8). Ideally, we'd want a 
system capable of dynamically constructing the following simplified schema, 
which, in essence, is a strict subset of the original schema. 
+
+```go
+var (
+  // Simplified schema definition generated by the Arrow Record encoder based 
on 
+  // the data observed.
+  TracesSchema = arrow.NewSchema([]arrow.Field{
+    {Name: constants.ID, Type: arrow.PrimitiveTypes.Uint16, Nullable: true},
+    {Name: constants.StartTimeUnixNano, Type: 
arrow.FixedWidthTypes.Timestamp_ns},
+    {Name: constants.TraceId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 16}},
+    {Name: constants.SpanId, Type: &arrow.FixedSizeBinaryType{ByteWidth: 8}},
+    {Name: constants.Name, Type: &arrow.DictionaryType {
+      IndexType: arrow.PrimitiveTypes.Uint8,
+      ValueType: arrow.BinaryTypes.String}},
+    {Name: constants.KIND, Type: &arrow.DictionaryType {
+      IndexType: arrow.PrimitiveTypes.Uint8,
+      ValueType: arrow.PrimitiveTypes.Int32,
+    }, Nullable: true},
+  }, nil)
+)
+```
+
+Additionally, we desire a system capable of automatically adapting the 
aforementioned schema if it encounters new fields or existing fields with a 
cardinality exceeding the size of the current dictionary definition in future 
batches. In extreme scenarios, if the cardinality of a specific field surpasses 
a certain threshold, we would prefer the system to automatically revert to the 
non-dictionary representation (mechanism of dictionary overflow). That is 
precisely what we will elaborate on in the remainder of this section.  
+
+An overview of the different components and events used to implement this 
approach is depicted in figure 1.
+
+<figure style="text-align: center;">
+  <img src="{{ site.baseurl 
}}/img/journey-apache-arrow/adaptive-schema-architecture.svg" width="100%" 
class="img-responsive" alt="Fig 1: Adaptive Arrow schema architecture 
overview.">
+  <figcaption>Fig 1: Adaptive Arrow schema architecture overview.</figcaption>
+</figure>
+
+The overall Adaptive Arrow schema component takes a data stream segmented into 
batches and produces one or multiple streams of Arrow Records (one schema per 
stream). Each of these records is defined with an Arrow schema, which is based 
both on the annotated Arrow schema and the shape of fields observed in the 
incoming data.
+
+More specifically, the process of the Adaptive Arrow schema component consists 
of four main phases
+
+**Initialization phase**
+
+During the initialization phase, the Arrow Record Encoder reads the annotated 
Arrow schema (i.e. the reference schema) and generates a collection of 
transformations. When these transformations are applied to the reference 
schema, they yield the first minimal Arrow schema that adheres to the 
constraints depicted by these annotations. In this initial iteration, all 
optional fields are eliminated, and all dictionary-encoded fields are 
configured to utilize the smallest encoding as defined by the annotation. These 
transformations form a tree, reflecting the structure of the reference schema. 

Review Comment:
   Made a change to clarify a bit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to