KKcorps commented on issue #11969:
URL: https://github.com/apache/pinot/issues/11969#issuecomment-1815761117

   Tried with both 1.0.0 and master but unable to repro this issue on my end. 
   
   I noticed that the number of values in the MV column are around 3000 per row 
and so took that into account as well while publishing the data
   
   Here's the code I am using for data gen
   
   ```java
   public class DeviceEventPublisher {
   
     private static final Random random = new Random();
     private static final String KAFKA_TOPIC = "mv_ingestion_repro";
     private static final String BOOTSTRAP_SERVERS = "localhost:19092"; // 
Update with your Kafka server details
     public static final int NUM_EVENTS_TO_PUBLISH = 30000;
     public static final int MV_ARRAY_LENGTH = 3203;
     private KafkaProducer<String, String> producer;
   
     public DeviceEventPublisher() {
       Properties properties = new Properties();
       properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
BOOTSTRAP_SERVERS);
       properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
       properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName());
       this.producer = new KafkaProducer<>(properties);
     }
   
     public void publishEvent(DeviceEvent event) {
       try {
         String jsonData = JsonUtils.objectToString(event);
         producer.send(new ProducerRecord<>(KAFKA_TOPIC, jsonData));
       } catch (Exception e) {
         System.out.println("Exception publishing event: " + e);
       }
     }
   
     public static void main(String[] args) {
       DeviceEventPublisher publisher = new DeviceEventPublisher();
   
       // Generate and publish 10 random events
       for (int i = 0; i < NUM_EVENTS_TO_PUBLISH; i++) {
         DeviceEvent event = generateRandomEvent();
         publisher.publishEvent(event);
   
         if (i % 1000 == 0) {
           System.out.println("Published " + i + " events");
         }
       }
   
       System.out.println("Published " + NUM_EVENTS_TO_PUBLISH + " events to 
Kafka topic " + KAFKA_TOPIC);
   
       publisher.producer.close();
     }
   
     private static DeviceEvent generateRandomEvent() {
       DeviceEvent event = new DeviceEvent();
   
       // Random values for each field
       event.setCountry("Country" + random.nextInt(100));
       event.setDeviceId(UUID.randomUUID().toString());
       event.setDeviceType("Type" + random.nextInt(5));
       event.setSegments(generateRandomIntList());
       event.setOptions(generateRandomIntList());
       event.setRelationId(UUID.randomUUID().toString());
       event.setClient(random.nextInt(1000));
       event.setTimestamp(System.currentTimeMillis() / 1000L);
   
       return event;
     }
   
     @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy.class)
     private static class DeviceEvent {
       private String country;
       private String deviceId;
       private String deviceType;
       private List<Integer> segments;
       private List<Integer> options;
       private String relationId;
       private int client;
       private long timestamp;
   
       public String getCountry() {
         return country;
       }
   
       public void setCountry(String country) {
         this.country = country;
       }
   
       public String getDeviceId() {
         return deviceId;
       }
   
       public void setDeviceId(String deviceId) {
         this.deviceId = deviceId;
       }
   
       public String getDeviceType() {
         return deviceType;
       }
   
       public void setDeviceType(String deviceType) {
         this.deviceType = deviceType;
       }
   
       public List<Integer> getSegments() {
         return segments;
       }
   
       public void setSegments(List<Integer> segments) {
         this.segments = segments;
       }
   
       public List<Integer> getOptions() {
         return options;
       }
   
       public void setOptions(List<Integer> options) {
         this.options = options;
       }
   
       public String getRelationId() {
         return relationId;
       }
   
       public void setRelationId(String relationId) {
         this.relationId = relationId;
       }
   
       public int getClient() {
         return client;
       }
   
       public void setClient(int client) {
         this.client = client;
       }
   
       public long getTimestamp() {
         return timestamp;
       }
   
       public void setTimestamp(long timestamp) {
         this.timestamp = timestamp;
       }
     }
   
     private static List<Integer> generateRandomIntList() {
       List<Integer> list = new ArrayList<>();
       int size = MV_ARRAY_LENGTH;
       for (int i = 0; i < size; i++) {
         list.add(random.nextInt(10000));
       }
       return list;
     }
   }
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to