github-code-scanning[bot] commented on code in PR #14419: URL: https://github.com/apache/druid/pull/14419#discussion_r1232736620
########## examples/quickstart/jupyter-notebooks/notebooks/02-ingestion/DruidDataDriver.py: ########## @@ -0,0 +1,1132 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +# DruidDataDriver - generates JSON records as a workload for Apache Druid. +# + +import argparse +import dateutil.parser +from datetime import datetime, timedelta +import json +import numpy as np +import random +import re +from sortedcontainers import SortedList +import string +import sys +import threading +import time + +############################################################################ +# +# DruidDataDriver simulates Druid workloads by producing JSON records. +# Use a JSON config file to describe the characteristics of the workload +# you want to simulate. +# +# Run the program as follows: +# python DruidDataDriver.py <config file name> <options> +# Options include: +# -n <total number of records to generate> +# -t <duration for generating records> +# +# See the associated documentation for the format of the config file. +# +############################################################################ + + +class FutureEvent: + def __init__(self, t): + self.t = t + self.name = threading.current_thread().name + self.event = threading.Event() + def get_time(self): + return self.t + def get_name(self): + return self.name + def __lt__(self, other): + return self.t < other.t + def __eq__(self, other): + return self.t == other.t + def __le__(self, other): + return self.t <= other.t + def __gt__(self, other): + return self.t > other.t + def __ge__(self, other): + return self.t >= other.t + + def __str__(self): + return 'FutureEvent('+self.name+', '+str(self.t)+')' + def pause(self): + #print(self.name+" pausing") + self.event.clear() + self.event.wait() + def resume(self): + #print(self.name+" resuming") + self.event.set() + +class Clock: + future_events = SortedList() + active_threads = 0 + lock = threading.Lock() + sleep_lock = threading.Lock() + + def __init__(self, time_type, start_time = datetime.now()): + self.sim_time = start_time + self.time_type = time_type + + def __str__(self): + s = 'Clock(time='+str(self.sim_time) + for e in self.future_events: + s += ', '+str(e) + s += ')' + return s + + def activate_thread(self): + if self.time_type == 'SIM': + self.lock.acquire() + self.active_threads += 1 + self.lock.release() + + def deactivate_thread(self): + if self.time_type == 'SIM': + self.lock.acquire() + self.active_threads -= 1 + self.lock.release() + + def end_thread(self): + if self.time_type == 'SIM': + self.lock.acquire() + self.active_threads -= 1 + if len(self.future_events) > 0: + self.remove_event().resume() + self.lock.release() + + def release_all(self): + if self.time_type == 'SIM': + self.lock.acquire() + #print('release_all - active_threads = '+str(self.active_threads)) + for e in self.future_events: + e.resume() + self.lock.release() + + def add_event(self, future_t): + this_event = FutureEvent(future_t) + self.future_events.add(this_event) + #print('add_event (after) '+threading.current_thread().name+' - '+str(self)) + return this_event + + def remove_event(self): + #print('remove_event (before) '+threading.current_thread().name+' - '+str(self)) + next_event = self.future_events[0] + self.future_events.remove(next_event) + return next_event + + def pause(self, event): + self.active_threads -= 1 + self.lock.release() + event.pause() + self.lock.acquire() + self.active_threads += 1 + + def resume(self, event): + event.resume() + + def now(self): + if self.time_type == 'SIM': + t = self.sim_time + else: + t = datetime.now() + return t + + def sleep(self, delta): + if self.time_type == 'SIM': # Simulated time + self.lock.acquire() + #print(threading.current_thread().name+" begin sleep "+str(self.sim_time)+" + "+str(delta)) + this_event = self.add_event(self.sim_time + timedelta(seconds=delta)) + #print(threading.current_thread().name+" active threads "+str(self.active_threads)) + if self.active_threads == 1: + next_event = self.remove_event() + if str(this_event) != str(next_event): + self.resume(next_event) + #print(threading.current_thread().name+" start pause if") + self.pause(this_event) + #print(threading.current_thread().name+" end pause if") + else: + #print(threading.current_thread().name+" start pause else") + self.pause(this_event) + #print(threading.current_thread().name+" end pause else") + self.sim_time = this_event.get_time() + #print(threading.current_thread().name+" end sleep "+str(self.sim_time)) + self.lock.release() + + else: # Real time + time.sleep(delta) + + +# +# Set up the target +# + +class PrintStdout: + lock = threading.Lock() + def print(self, record): + with self.lock: + print(str(record)) + sys.stdout.flush() + def __str__(self): + return '#printStdout()' + +class PrintFile: + f = None + def __init__(self, file_name): + self.file_name = file_name + self.f = open(file_name, 'w') + def __del__(self): + if self.f != None: + self.f.close() + def __str__(self): + return 'PrintFile(file_name='+self.file_name+')' + def print(self, record): + self.f.write(str(record)+'\n') + self.f.flush() + +class PrintKafka: + producer = None + topic = None + def __init__(self, endpoint, topic, security_protocol, compression_type): + from kafka import KafkaProducer + + #print('PrintKafka('+str(endpoint)+', '+str(topic)+', '+str(security_protocol)+', '+str(compression_type)+')') + self.endpoint = endpoint + self.producer = KafkaProducer(bootstrap_servers=endpoint, security_protocol=security_protocol, compression_type=compression_type, value_serializer=lambda v: json.dumps(v).encode('utf-8')) + self.topic = topic + def __str__(self): + return 'PrintKafka(endpoint='+self.endpoint+', topic='+self.topic+')' + def print(self, record): + self.producer.send(self.topic, json.loads(str(record))) + +class PrintConfluent: + producer = None + topic = None + username = None + password = None + def __init__(self, servers, topic, username, password): + from confluent_kafka import Producer + + #print('PrintKafka('+str(endpoint)+', '+str(topic)+', '+str(security_protocol)+', '+str(compression_type)+')') + self.servers = servers + self.producer = Producer({ + 'bootstrap.servers': servers, + 'sasl.mechanisms': 'PLAIN', + 'security.protocol': 'SASL_SSL', + 'sasl.username': username, + 'sasl.password': password + }) + self.topic = topic + self.username = username + self.password = password + def __str__(self): + return 'PrintConfluent(servers='+self.servers+', topic='+self.topic+', username='+self.username+', password='+self.password+')' + def print(self, record): + print('producing '+str(record)) + self.producer.produce(topic=self.topic, value=str(record)) + self.producer.flush() + + +# +# Handle distributions +# + +class DistConstant: + def __init__(self, value): + self.value = value + def __str__(self): + return 'DistConstant(value='+str(self.value)+')' + def get_sample(self): + return self.value + +class DistUniform: + def __init__(self, min_value, max_value): + self.min_value = min_value + self.max_value = max_value + def __str__(self): + return 'DistUniform(min_value='+str(self.min_value)+', max_value='+str(self.max_value)+')' + def get_sample(self): + return np.random.uniform(self.min_value, self.max_value+1) + +class DistExponential: + def __init__(self, mean): + self.mean = mean + def __str__(self): + return 'DistExponential(mean='+str(self.mean)+')' + def get_sample(self): + return np.random.exponential(scale = self.mean) + +class DistNormal: + def __init__(self, mean, stddev): + self.mean = mean + self.stddev = stddev + def __str__(self): + return 'DistNormal(mean='+str(self.mean )+', stddev='+str(self.stddev)+')' + def get_sample(self): + return np.random.normal(self.mean, self.stddev) + +def parse_distribution(desc): + dist_type = desc['type'].lower() + dist_gen = None + if dist_type == 'constant': + value = desc['value'] + dist_gen = DistConstant(value) + elif dist_type == 'uniform': + min_value = desc['min'] + max_value = desc['max'] + dist_gen = DistUniform(min_value, max_value) + elif dist_type == 'exponential': + mean = desc['mean'] + dist_gen = DistExponential(mean) + elif dist_type == 'normal': + mean = desc['mean'] + stddev = desc['stddev'] + dist_gen = DistNormal(mean, stddev) + else: + print('Error: Unknown distribution "'+dist_type+'"') + exit() + return dist_gen + +def parse_timestamp_distribution(desc): + dist_type = desc['type'].lower() + dist_gen = None + if dist_type == 'constant': + value = dateutil.parser.isoparse(desc['value']).timestamp() + dist_gen = DistConstant(value) + elif dist_type == 'uniform': + min_value = dateutil.parser.isoparse(desc['min']).timestamp() + max_value = dateutil.parser.isoparse(desc['max']).timestamp() + dist_gen = DistUniform(min_value, max_value) + elif dist_type == 'exponential': + mean = dateutil.parser.isoparse(desc['mean']).timestamp() + dist_gen = DistExponential(mean) + elif dist_type == 'normal': + mean = desc[dateutil.parser.isoparse(desc['mean']).timestamp()] + stddev = desc['stddev'] + dist_gen = DistNormal(mean, stddev) + else: + print('Error: Unknown distribution "'+dist_type+'"') + exit() + return dist_gen + + +# +# Set up the dimensions for the emitters (see below) +# There is one element class for each dimension type. This code creates a list of +# elements and then runs through the list to create a single record. +# Notice that the get_json_field_string() method produces the JSON dimension +# field object based on the dimension configuration. +# The get_stochastic_value() method is like a private method used to get a random +# idividual value. +# + +class ElementNow: # The __time dimension + def __init__(self, global_clock): + self.global_clock = global_clock_ + def __str__(self): + return 'ElementNow()' + def get_json_field_string(self): + now = self.global_clock.now().isoformat()[:-3] + return '"__time":"'+now+'"' + +class ElementCounter: # The __time dimension + def __init__(self, desc): + self.name = desc['name'] + if 'percent_nulls' in desc.keys(): + self.percent_nulls = desc['percent_nulls'] / 100.0 + else: + self.percent_nulls = 0.0 + if 'percent_missing' in desc.keys(): + self.percent_missing = desc['percent_missing'] / 100.0 + else: + self.percent_missing = 0.0 + if 'start' in desc.keys(): + self.start = desc['start'] + else: + self.start = 0 + if 'increment' in desc.keys(): + self.increment = desc['increment'] + else: + self.increment = 1 + self.value = self.start + def __str__(self): + s = 'ElementCounter(name='+self.name + if self.start != 0: + s += ', '+str(self.start) + if self.increment != 1: + s += ', '+str(self.increment) + s += ')' + return s + + def get_stochastic_value(self): + v = self.value + self.value += self.increment + return v + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"' + return s + + def is_missing(self): + return random.random() < self.percent_missing + + +class ElementEnum: # enumeration dimensions + def __init__(self, desc): + self.name = desc['name'] + if 'percent_nulls' in desc.keys(): + self.percent_nulls = desc['percent_nulls'] / 100.0 + else: + self.percent_nulls = 0.0 + if 'percent_missing' in desc.keys(): + self.percent_missing = desc['percent_missing'] / 100.0 + else: + self.percent_missing = 0.0 + self.cardinality = desc['values'] + if 'cardinality_distribution' not in desc.keys(): + print('Element '+self.name+' specifies a cardinality without a cardinality distribution') + exit() + self.cardinality_distribution = parse_distribution(desc['cardinality_distribution']) + + def __str__(self): + return 'ElementEnum(name='+self.name+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+')' + + def get_stochastic_value(self): + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + return self.cardinality[index] + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + s = '"'+self.name+'":"'+str(self.get_stochastic_value())+'"' + return s + + def is_missing(self): + return random.random() < self.percent_missing + +class ElementVariable: # Variable dimensions + def __init__(self, desc): + self.name = desc['name'] + self.variable_name = desc['variable'] + + def __str__(self): + return 'ElementVariable(name='+self.name+', value='+self.variable_name+')' + + def get_json_field_string(self, variables): # NOTE: because of timing, this method has a different signature than the other elements + value = variables[self.variable_name] + return '"'+self.name+'":"'+str(value)+'"' + + +class ElementBase: # Base class for the remainder of the dimensions + def __init__(self, desc): + self.name = desc['name'] + if 'percent_nulls' in desc.keys(): + self.percent_nulls = desc['percent_nulls'] / 100.0 + else: + self.percent_nulls = 0.0 + if 'percent_missing' in desc.keys(): + self.percent_missing = desc['percent_missing'] / 100.0 + else: + self.percent_missing = 0.0 + + self.cardinality_setting = desc['cardinality'] + self.cardinality_distribution = None + + if self.cardinality_setting == 0: + self.cardinality = None + + else: + self.cardinality = [] + if 'cardinality_distribution' not in desc.keys(): + print('Element '+self.name+' specifies a cardinality without a cardinality distribution') + exit() + self.cardinality_distribution = parse_distribution(desc['cardinality_distribution']) + self.init_cardinality() + + def init_cardinality(self): + for i in range(self.cardinality_setting): + value = None + while True: + value = self.get_stochastic_value() + if value not in self.cardinality: + break + self.cardinality.append(value) + + + def get_stochastic_value(self): + pass + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + value = self.get_stochastic_value() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + value = self.cardinality[index] + s = '"'+self.name+'":'+str(value) + return s + + def is_missing(self): + return random.random() < self.percent_missing + + +class ElementString(ElementBase): + def __init__(self, desc): + self.length_distribution = parse_distribution(desc['length_distribution']) + if 'chars' in desc: + self.chars = desc['chars'] + else: + self.chars = string.printable + super().__init__(desc) + + def __str__(self): + return 'ElementString(name='+self.name+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+', chars='+self.chars+')' + + def get_stochastic_value(self): + length = int(self.length_distribution.get_sample()) + return ''.join(random.choices(list(self.chars), k=length)) + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + value = self.get_stochastic_value() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + value = self.cardinality[index] + s = '"'+self.name+'":"'+str(value)+'"' + return s + +class ElementInt(ElementBase): + def __init__(self, desc): + self.value_distribution = parse_distribution(desc['distribution']) + super().__init__(desc) + + def __str__(self): + return 'ElementInt(name='+self.name+', value_distribution='+str(self.value_distribution)+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+')' + + def get_stochastic_value(self): + return int(self.value_distribution.get_sample()) + +class ElementFloat(ElementBase): + def __init__(self, desc): + self.value_distribution = parse_distribution(desc['distribution']) + if 'precision' in desc: + self.precision = desc['precision'] + else: + self.precision = None + super().__init__(desc) + + def __str__(self): + return 'ElementFloat(name='+self.name+', value_distribution='+str(self.value_distribution)+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+')' + + def get_stochastic_value(self): + return float(self.value_distribution.get_sample()) + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + value = self.get_stochastic_value() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + value = self.cardinality[index] + if self.precision is None: + s = '"'+self.name+'":'+str(value) + else: + format = '%.'+str(self.precision)+'f' + s = '"'+self.name+'":'+str(format%value) + return s + +class ElementTimestamp(ElementBase): + def __init__(self, desc): + super().__init__(desc) + + def __str__(self): + return 'ElementTimestamp(name='+self.name+', value_distribution='+str(self.value_distribution)+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+')' + + def get_stochastic_value(self): + return datetime.fromtimestamp(self.value_distribution.get_sample()).isoformat()[:-3] + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + value = self.get_stochastic_value() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + value = self.cardinality[index] + s = '"'+self.name+'":"'+str(value)+'"' + return s + + def is_missing(self): + return random.random() < self.percent_missing + +class ElementIPAddress(ElementBase): + def __init__(self, desc): + self.value_distribution = parse_distribution(desc['distribution']) + super().__init__(desc) + + def __str__(self): + return 'ElementIPAddress(name='+self.name+', value_distribution='+str(self.value_distribution)+', cardinality='+str(self.cardinality)+', cardinality_distribution='+str(self.cardinality_distribution)+')' + + def get_stochastic_value(self): + value = int(self.value_distribution.get_sample()) + return str((value & 0xFF000000) >> 24)+'.'+str((value & 0x00FF0000) >> 16)+'.'+str((value & 0x0000FF00) >> 8)+'.'+str(value & 0x000000FF) + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + value = self.get_stochastic_value() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + value = self.cardinality[index] + s = '"'+self.name+'":"'+str(value)+'"' + return s + +class ElementObject(): + def __init__(self, desc): + self.name = desc['name'] + self.dimensions = get_variables(desc['dimensions']) + if 'percent_nulls' in desc.keys(): + self.percent_nulls = desc['percent_nulls'] / 100.0 + else: + self.percent_nulls = 0.0 + if 'percent_missing' in desc.keys(): + self.percent_missing = desc['percent_missing'] / 100.0 + else: + self.percent_missing = 0.0 + cardinality = desc['cardinality'] + if cardinality == 0: + self.cardinality = None + self.cardinality_distribution = None + else: + self.cardinality = [] + if 'cardinality_distribution' not in desc.keys(): + print('Element '+self.name+' specifies a cardinality without a cardinality distribution') + exit() + self.cardinality_distribution = parse_distribution(desc['cardinality_distribution']) + for i in range(cardinality): + value = None + while True: + value = self.get_instance() + if value not in self.cardinality: + break + self.cardinality.append(value) + + def __str__(self): + s = 'ElementObject(name='+self.name+', dimensions=[' + for e in self.dimensions: + s += ',' + str(e) + s += '])' + return s + + def get_instance(self): + s = '"'+self.name+'": {' + for e in self.dimensions: + s += e.get_json_field_string() + ',' + s = s[:-1] + '}' + return s + + + def get_json_field_string(self): + if random.random() < self.percent_nulls: + s = '"'+self.name+'": null' + else: + if self.cardinality is None: + s = self.get_instance() + else: + index = int(self.cardinality_distribution.get_sample()) + if index < 0: + index = 0 + if index >= len(self.cardinality): + index = len(self.cardinality)-1 + s = self.cardinality[index] + return s + + def is_missing(self): + return random.random() < self.percent_missing + +class ElementList(): + def __init__(self, desc): + self.name = desc['name'] + self.elements = get_variables(desc['elements']) + self.length_distribution = parse_distribution(desc['length_distribution']) + self.selection_distribution = parse_distribution(desc['selection_distribution']) + if 'percent_nulls' in desc.keys(): + self.percent_nulls = desc['percent_nulls'] / 100.0 + else: + self.percent_nulls = 0.0 + if 'percent_missing' in desc.keys(): + self.percent_missing = desc['percent_missing'] / 100.0 + else: + self.percent_missing = 0.0 + cardinality = desc['cardinality'] + if cardinality == 0: + self.cardinality = None + self.cardinality_distribution = None + else: + self.cardinality = [] + if 'cardinality_distribution' not in desc.keys(): + print('Element '+self.name+' specifies a cardinality without a cardinality distribution') + exit() + self.cardinality_distribution = parse_distribution(desc['cardinality_distribution']) + for i in range(cardinality): + Value = None Review Comment: ## Unused local variable Variable Value is not used. [Show more details](https://github.com/apache/druid/security/code-scanning/5104) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
