#general


@diana.arnos: Hello again! :sweat_smile: We are trying some production environment setups and I'm having trouble identifying the optimal configuration. Can you point me to some resources? I also need to find out how much storage I need to setup for the Controller, but I couldn't see anything related to that in the docs. I tried running with 1G (the default value) and 10G, but it wasn't enough. Segments are uploaded to Controller storage, right? On the thread, my schema, table configs and helm chart configs.
  @diana.arnos: Table schema: ```{ "schemaName": "responseCount", "primaryKeyColumns": ["responseId"], "dimensionFieldSpecs": [ { "name": "responseId", "dataType": "STRING" }, { "name": "formId", "dataType": "STRING" }, { "name": "channelId", "dataType": "STRING" }, { "name": "channelPlatform", "dataType": "STRING" }, { "name": "companyId", "dataType": "STRING" }, { "name": "submitted", "dataType": "BOOLEAN" }, { "name": "deleted", "dataType": "BOOLEAN" } ], "dateTimeFieldSpecs": [ { "name": "recordTimestamp", "dataType": "STRING", "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSSZ", "granularity": "1:MILLISECONDS" }, { "name": "createdAt", "dataType": "STRING", "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSSZ", "granularity": "1:MILLISECONDS" }, { "name": "deletedAt", "dataType": "STRING", "format": "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSSZ", "granularity": "1:MILLISECONDS" } ] }``` Table configs: ```{ "tableName": "responseCount", "tableType": "REALTIME", "segmentsConfig": { "schemaName": "responseCount", "timeColumnName": "recordTimestamp", "replication": "1", "replicasPerPartition": "2" }, "upsertConfig": { "mode": "PARTIAL", "partialUpsertStrategies": { "deleted": "OVERWRITE", "formId": "OVERWRITE", "recordTimestamp": "OVERWRITE", "channelId": "OVERWRITE", "channelPlatform": "OVERWRITE", "companyId": "OVERWRITE", "submitted": "OVERWRITE", "createdAt": "OVERWRITE", "deletedAt": "OVERWRITE" } }, "routing": { "instanceSelectorType": "strictReplicaGroup" }, "tableIndexConfig": { "loadMode": "MMAP", "nullHandlingEnabled": true, "streamConfigs": { "streamType": "kafka", "stream.kafka.topic.name": "[redacted]", "stream.kafka.broker.list": "[redacted]", "stream.kafka.consumer.type": "lowlevel", "stream.kafka.consumer.prop.auto.offset.reset": "smallest", "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory", "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder", "realtime.segment.flush.threshold.rows": "0", "realtime.segment.flush.threshold.time": "24h", "realtime.segment.flush.segment.size": "100M" } }, "tenants": {}, "metadata": {} }``` Helm chart config: ```controller: jvmOpts: "-Xms1G -Xmx4G -javaagent:/opt/pinot/etc/jmx_prometheus_javaagent/jmx_prometheus_javaagent-0.12.0.jar=5556:/opt/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml" persistence: size: 100G ingress: v1beta1: enabled: true annotations: : internal tls: { } path: / hosts: - resources: limits: cpu: 1 memory: 4G requests: cpu: 1 memory: 4G broker: jvmOpts: "-Xms1G -Xmx4G -javaagent:/opt/pinot/etc/jmx_prometheus_javaagent/jmx_prometheus_javaagent-0.12.0.jar=5556:/opt/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml" ingress: v1beta1: enabled: true annotations: : internal tls: { } path: / hosts: - resources: limits: cpu: 1 memory: 4G requests: cpu: 1 memory: 4G server: replicaCount: 3 jvmOpts: "-Xms2G -Xmx7G -javaagent:/opt/pinot/etc/jmx_prometheus_javaagent/jmx_prometheus_javaagent-0.12.0.jar=5556:/opt/pinot/etc/jmx_prometheus_javaagent/configs/pinot.yml" resources: limits: cpu: 4 memory: 16G requests: cpu: 4 memory: 16G```
  @diogo.baeder: I think it will depend a lot on the amount of data you have, probably. But I can provide what we started using on the company I work for, for ~100M rows where each has ~5KB of data: • 1 node for Kafka, 8GB RAM, 100GB storage space • 1 node for ZooKeeper, 8GB RAM, 100GB storage space • 2 node for Pinot Controllers, 8GB RAM, 20GB storage space • 1 node for Pinot Broker, 16GB RAM, 20GB storage space • 2 nodes for Pinot Server, 16GB RAM, 1TB storage space
  @dlavoie: Do you have deepstore configured with controllers?
  @dlavoie: Not having deepstore means the controller will use the local FS to store segments. If you have multiple controllers, it means segments will be inconsistenly stored in different controllers. Local FS with controllers only works if you are pointing to something like NFS. Recommended approach is to go with any of the supported deep store. This will offload all long term storage outside of the controller and you shouldn’t observe massive GB usage on them.
  @dlavoie: If you are running single controller, then local FS is good enough if you can afford the local storage and the non HA constraint.
  @diogo.baeder: Agreed, and I learned that the hard way :smile:
  @diana.arnos: We will have ~800M rows with ~300 bytes of data We don't have deep storage, but we have only 1 Controller Maybe due to the required amount of data, we need to set up deep storage
  @dlavoie: Controller will have to storage everything that is stored by the servers.
  @dlavoie: Is suposed to be the cold copy of servers in case a server disk is lost
  @diana.arnos: Of course, that makes sense :woman-facepalming: I need to bump up server's storage
  @dlavoie: I would not recommend going into production without deep store configured for servers.
  @dlavoie: It’s your safety policy
  @diana.arnos: Yes, I will setup those
  @mayanks: Also would need replication for controller/broker/server in production
  @diogo.baeder: @mayanks where do aggregations run, after the raw data is fetched? Is it on the brokers?
  @diana.arnos: I've already setup replication. Will the deep storage be used by a realtime table in the same way it happens for offline ones? We only have a realtime table and we didn't setup a realtime to offline job (to be honest, we don't know if we need this)
  @diogo.baeder: For realtime tables there's always at least one "consuming" segment (which is open for writes) and then the "committed" segments, which are flushed to the deep storage. Kinda "hot" and "cold" segments, in a sense. Once a consuming segment reaches a defined threshold (by table configuration), it gets committed and flushed to the deep storage.
  @diogo.baeder: One mistake I did when my company migrated our Pinot cluster to another AWS region was that I didn't stop and commit the consuming segments, and I ended up losing the "hot" data - I still kept the data from the deep storage though.
  @diana.arnos: That's a good advice indeed :thinking_face: How am I supposed to stop a table and commit the consuming segments? Would it happen automatically if I disable a table?
  @mayanks: @diogo.baeder For a query, servers do aggregation on data they host, and broker does a final merge.
  @mayanks: @diana.arnos Are you trying to migrate table from one AWS region to another?
  @diogo.baeder: @mayanks nice, thanks! :slightly_smiling_face:
  @diana.arnos: No, we are setting up our first prod deployment
  @xiaobing: to my knowledge, disabling table stops the consumption, but doesn’t force to commit the consuming segments. But if the data is still in Kafka, restarting the table should resume consumption from last commit offset (i.e. the end of last commit segments).
  @mayanks: Ok, in case of first prod deployment, why do you need to stop consumption @diana.arnos? In Diogo’s case, it seems like there was a migration involved
  @diogo.baeder: I think there's some noise going on, probably; I said about migration just because we had to do it on our company, just as an example of what happens to segments being consumed or having been committed already. Diana's case doesn't seem to involve any of that at the moment, but I think she asked just out of curiosity, in case she falls into the same situation. Am I correct, @diana.arnos?
  @diogo.baeder: And sorry for mentioning our migration, you can blame this confusion on me :smile:
  @mayanks: All good @diogo.baeder, thanks for your help, as always.
  @diogo.baeder: np :blush:
@saumya2700: :wave: Hi everyone! I am new to apache pino, I am using it fro relatime data ingestion from kafka topic, we are using confluent kafka and schema registry and avro schema. I am able connect kafka topic as my table is successfully created and its in healthy state, but query is not showing any records. how can we check that it has some issues in consuming side. In debug table I cannot see any errors.
  @francois: Hi :slightly_smiling_face: Have you check server logs for any issues ?
  @saumya2700: where to check for server logs
  @francois: It depends on the way you launch the server.
  @saumya2700: we launched it through helm
  @saumya2700: Got the logs : [Consumer clientId=f83d920f-6e8d-465b-bd83-2552132f241e2, groupId=tscalrecord7_REALTIME_1647430665510_0] Bootstrap broker XXXXXX:9092 (id: -1 rack: null) disconnected
  @francois: Did you get any error ?
  @mayanks: Check the table debug api in swagger to see if it bubbles up any problems
  @mayanks: If there are events in Kafka stream, likely messages are being skipped due to decoding error or schema mismatch or table config issues (eg time unit mismatch)
@nizar.hejazi: Hi everyone, can Apache Pinot supports a list of lists data type (a multi-valued column where values are also lists)? Thanks.
  @mayanks: Pinot supports JSON for complex data types, you can use that?
  @nizar.hejazi: yes, I can use Json but this would results in more complicated queries (we generate queries on the fly) in addition to the performance hit from using Json.
  @mayanks: What’s the performance hit with JSON you are referring to?
  @mayanks: There’s JSON indexing that performs very well.
  @nizar.hejazi: From : It’s not performant to group by or order by a JSON field, because `JSON_EXTRACT_SCALAR` is needed to extract the values in the GROUP BY and ORDER BY clauses, which invokes the function evaluation.
  @nizar.hejazi: I was looking for something similar to Presto’s support for ARRAY that can have ROW where ROW again can have ARRAY. Ex: `select ARRAY[ROW(ARRAY[1,2], ARRAY['a','b'])];`
  @mayanks: We don’t have that yet (perhaps file a GH issue with request). For performance, I’d recommend trying it out, as it depends on a lot of factors including your SLA requirement, query selectivity etc
  @nizar.hejazi: thanks
@mndoping: @mndoping has joined the channel
@grace.lu: Hi team, I would like to get some suggestions about what does the pinot batch ingestion story look like in Production environment. Ideally we want to use spark cluster mode for ingestion in production, but we ran into lots of issue when submitting job in distributed fashion to our production spark clusters on yarn. Currently we only have spark local mode and pinot standalone ingestion working for batch data, but we are worried this will not be sustainable for ingesting larger production tables. What do people generally use for ingesting pinot data in production? Asking because I don’t see too much documentation and discussion around using spark generation job with yarn master and cluster deploy mode. Besides, we are at hadoop 2.9.1, spark 2.4.6 on yarn, pinot 0.9.2, also interested to know if anyone has successfully set up cluster mode batch ingestion with similar hadoop/spark environment:eyes:.
  @ken: Hi @grace.lu - we run Flink jobs to build per-segment CSV files (saved in HDFS), then run the Pinot Hadoop MapReduce segment generation job to build segments. Both of these workflows are running on Yarn. The resulting segments are stored in HDFS, in our configured “deep store” location. Then we run an admin job (single server) to do a metadata push of the new segments. This has been in production for a few months. We use Airflow to orchestrate everything. Just FYI, now that support has been added for a Flink segment writer “sink”, we’ll be able to skip the intermediate CSV file generation step.
  @grace.lu: @ken Thanks a lot for the info! Would you mind sharing what’s your flink version and hadoop version?
  @ken: Flink 1.13.2 (transitioning to 1.14.3) and Hadoop 2.8.5
  @saravana6m: Thanks @ken for the details. when you say metadata push of the new segments, do we have a way to get only the metadata for each segment and make a call to Pinot? thanks for your attention.
  @ken: A “metadata push” is a kind of data ingestion job that you submit to the Pinot admin tool. See for details. You specify a directory containing segments, and information about the controller, the table name, etc. via the ingestion job spec (a yaml file).
  @saravana6m: This is helpful Ken. Thanks for the reference @ken! :slightly_smiling_face:
@pranaybsankpal2050: Can someone guide me on where to start reading the code. I want to understand architecture and codebase as well.
  @mayanks: For understanding there’s some good videos you can start with
  @pranaybsankpal2050: Hi Mayank, can you please share the links?
  @mayanks:
  @mayanks: @pranaybsankpal2050 you can find many videos covering different topics for Apache Pinot here ^^
  @npawar: to start reading code, quickstart is a good starting point. you’ll be able to read the ingestion and also query side with breakpoints.
@ameyacm: @ameyacm has joined the channel

#random


@mndoping: @mndoping has joined the channel
@ameyacm: @ameyacm has joined the channel

#troubleshooting


@francois: Hi. I’ve found an interesting thing but not sure it’s a good practice :confused: I’m still in POC and my kafka have crashed -> So my new message offset are reseted at 0. But pinot keep looking for older offsets. I’ve searched and found that It was not possible to reset an offset on pinot side. I’ve tried several things un-sucessfully before doing the following : 1 Disable the realtime table 2 Find out in zookeeper the consuming segments 3 Set the offset values to 0 for the segment of step 2 4 Enable the table And the consumption resume as I expected without data loss and only a small downtime. Is that a good practice or a least a workaround before new dev around this topic as read in the designs doc ? :wink:
  @mayanks: Thanks for sharing @francois, yes if the upstream kafka offsets become inconsistent we run into these issues. In your specific case (kafka crash), you could unblock by doing so, but note that Pinot won’t know if there’s any data loss or duplication. I think we have an issue being discussed where we want to at least allow the consumption to begin without having to do the ZK edits. cc: @navi.trinity @npawar
  @ssubrama: @francois for now, the only thing you can do is to drop your table and restart consumption. If you need the data then you may have to save away the segments and upload them again (and that is a little tricky as well). We are working on a design to overcome such issues, the design is under review. cc: @moradi.sajjad
  @navina: @mayanks Yes. It is similar to the offset out of range exception @francois: In your case, is your older invalid message no longer in the kafka topic or is it still present and you would like to reset to a different offset?
  @mayanks: I think the recent cases are we have seen are issues where users understand and are OK with temporary data loss from Pinot, but want a way to simply restart the consumption on best effort case, without having to delete the entire table in Pinot.
  @francois: @navina In my case kafka is reseting his offset from 0. So older messages are gone :slightly_smiling_face:
  @navina: @francois I see. Ok. in that case, the solution should be similar to the one dealing with OOR. I think by default, the kafka consumer's `reset.offset` is set to `LATEST`. You could override it to use `EARLIEST` . We are still discussing if this should be a config driven behavior or use explicit kafka apis to handle resets. Please follow that issue for progress on this work!
  @npawar: fyi @francois this is the proposed solution which will help in this case
  @npawar: i dont think it will help in your case though.. even if we make kafka consumer reset to earliest, the pinot consumer will filter out the records because they’re out of range from pinot perspective. in your case, manually editing zk is one option. deleting and restarting is safest option. We should add an API to help with such resets
  @navina: ok. so this is a manual "re-position" use-case and cannot be handled as the OOR case
@stuart.millholland: So playing with the Dynamic tables section on the Trino Pinot connector . I'm finding that in order to filter on a column it has to be in the SELECT portion within the double quotes. I'm wondering if the documentation needs to be adjusted there, or I could also be doing something wrong.
  @stuart.millholland: Is this what the document should say? ```SELECT * FROM pinot.default."SELECT col3, col4, MAX(col1), COUNT(col2) FROM pinot_table GROUP BY col3, col4" WHERE col3 IN ('FOO', 'BAR') AND col4 > 50 LIMIT 30000```
  @mayanks: @elon.azoulay to confirm. Definitely +1 on fixing the doc if it needs to.
  @elon.azoulay: Thanks for catching this! Agreed, will update.
  @stuart.millholland: Awesome, ty!
  @mayanks: That was quick (as always), thanks @elon.azoulay
@stuart.millholland: Another question on the Trino Pinot connector. We have a schema definition, and here are a couple sample fields:
  @stuart.millholland: So this query works fine (in both data grip and shelling into my trino coordinator pod) ```SELECT * FROM pinot.default." SELECT country, account_type, COUNT(1) row_count FROM immutable_unified_events WHERE tenant_name = 'tenant1' AND user_is_admin = true --AND account_type = 'Internal' GROUP BY country, account_type LIMIT 50000" WHERE country = 'Congo'```
  @stuart.millholland: Notice the account_type is commented out
  @stuart.millholland: This query returns nothing: ```SELECT * FROM pinot.default." SELECT country, account_type, COUNT(1) row_count FROM immutable_unified_events WHERE tenant_name = 'tenant1' AND user_is_admin = true AND account_type = 'Internal' GROUP BY country, account_type LIMIT 50000" WHERE country = 'Congo'```
  @stuart.millholland: Yet this query does: ```SELECT * FROM pinot.default." SELECT country, account_type, COUNT(1) row_count FROM immutable_unified_events WHERE tenant_name = 'tenant1' AND user_is_admin = true GROUP BY country, account_type LIMIT 50000" WHERE country = 'Congo' AND account_type = 'Internal'```
  @stuart.millholland: In my schema definition both tenant_name and account_type are defined exactly the same.
@stuart.millholland: { "name": "subject", "dataType": "STRING" },
@stuart.millholland: { "name": "account_type", "dataType": "STRING" },
@stuart.millholland: I'm using DataGrip to explore the linked tables and I noticed the data type for subject is noted as varchar(max) and for account_type it's array(0). What that means is currently I can't filter on account_type inside of the quotes since it's considered an array
@stuart.millholland: Any ideas on why the connector decided one field was a varchar(max) and the other was an array(0)?
@stuart.millholland: What's even more interesting is querying the information schema, the metadat sees both of these as varchar, so it could be a data grip issue.
@elon.azoulay: I can't see the full schema, is it an mv column?
  @aaron.weiss: account_type is a single-value column in the schema
@mndoping: @mndoping has joined the channel
@facundo.bianco: Hi All :wave:, I'm trying to configure a date format like this "_2020-12-31T19:59:21.522-0400_" and created table-schema.json as ``` "dateTimeFieldSpecs": [{ "name": "timestampCustom", "dataType": "STRING", "format" : "1:MILLISECONDS:SIMPLE_DATE_FORMAT:yyyy-MM-dd'T'HH:mm:ss.SSZZ", "granularity": "1:MILLISECONDS" }]``` Table is generated successfully but POST command returns > {"code":500,"error":"Caught exception when ingesting file into table: foo_OFFLINE. null"} I discovered is related to date format, could you kindly indicate how should it be? I used to generate the custom format. Thanks in advance!
  @xiaobing: looks like you had a typo. maybe try `SSSZ` instead? as noted in the website.
  @mayanks: @facundo.bianco ^^
@ameyacm: @ameyacm has joined the channel
@grace.lu: Hi team, we ran into lots of issue when setting up spark ingestion job with Yarn. The latest issue we saw is that the application master reported the following error after the job is submitted to the cluster and no resources can be assigned to the job: ```Exception in thread "main" java.lang.NoSuchMethodError: org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils.convertToProtoFormat(Lorg/apache/hadoop/yarn/api/records/ExecutionType;)Lorg/apache/hadoop/yarn/proto/YarnProtos$ExecutionTypeProto; at org.apache.hadoop.yarn.api.records.impl.pb.ExecutionTypeRequestPBImpl.setExecutionType(ExecutionTypeRequestPBImpl.java:73)``` We wonder if pinot has also introduced this class in its dependencies and if it is conflicted with the library in our hadoop cluster itself? We are at spark 2.4.6, hadoop 2.9.1, pinot 0.9.2, and seems like pinot 0.9.2 is built with hadoop2.7.0 and spark 2.4.0, have we tested the compatible spark/hadoop version for running ingestion jobs?
  @grace.lu: for reference, the command we ran for submitting jobs: ```${SPARK_HOME}/bin/spark-submit --class org.apache.pinot.tools.admin.PinotAdministrator \ --master yarn \ --deploy-mode cluster \ --conf "spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins -Dlog4j2.configurationFile=${PINOT_DISTRIBUTION_DIR}/conf/pinot-ingestion-job-log4j2.xml" \ --conf "spark.driver.extraClassPath=${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar:${PLUGINS_CLASSPATH}" \ --files /mnt/data/home/grace/sparkIngestionJobSpec.yaml \ ${PINOT_DISTRIBUTION_DIR}/lib/pinot-all-${PINOT_VERSION}-jar-with-dependencies.jar \ LaunchDataIngestionJob \ -jobSpecFile sparkIngestionJobSpec.yaml```
  @mayanks: @xiangfu0 any suggestions?
  @xiangfu0: I would suggest to package all the required pinot jars together into a fat jar and use that
  @grace.lu: Is `pinot-all-0.9.2-jar-with-dependencies.jar` already a fat jar? Are you suggesting merging all the jars (or selected jars) in the plugins folders with pinot-all-0.9.2-jar-with-dependencies.jar and removing the `spark.driver.extraJavaOptions=-Dplugins.dir=${PINOT_DISTRIBUTION_DIR}/plugins` and `spark.driver.extraClassPath=${PLUGINS_CLASSPATH}` ?

#pinot-k8s-operator


@nizar.hejazi: @nizar.hejazi has joined the channel

#pinot-dev


@nizar.hejazi: @nizar.hejazi has joined the channel

#community


@nizar.hejazi: @nizar.hejazi has joined the channel

#announcements


@nizar.hejazi: @nizar.hejazi has joined the channel

#presto-pinot-connector


@nizar.hejazi: @nizar.hejazi has joined the channel

#multi-region-setup


@nizar.hejazi: @nizar.hejazi has joined the channel

#pinot-perf-tuning


@nizar.hejazi: @nizar.hejazi has joined the channel

#getting-started


@mndoping: @mndoping has joined the channel
@ameyacm: @ameyacm has joined the channel

#releases


@nizar.hejazi: @nizar.hejazi has joined the channel

#flink-pinot-connector


@nizar.hejazi: @nizar.hejazi has joined the channel

#complex-type-support


@nizar.hejazi: @nizar.hejazi has joined the channel

#pinot-trino


@nizar.hejazi: @nizar.hejazi has joined the channel
--------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@pinot.apache.org For additional commands, e-mail: dev-h...@pinot.apache.org

Reply via email to