Hi, Dong, Jiangjie,

20. (1) I agree that ideally we'd like to use direct RPC for
broker-to-broker communication instead of ZK. However, in the alternative
design, the failed log directory path also serves as the persistent state
for remembering the offline partitions. This is similar to the
controller_managed_state path in your design. The difference is that the
alternative design stores the state in fewer ZK paths, which helps reduce
the controller failover time. (2) I agree that we want the controller to be
the single place to make decisions. However, intuitively, the failure
reporting should be done where the failure is originated. For example, if a
broker fails, the broker reports failure by de-registering from ZK. The
failed log directory path is similar in that regard. (3) I am not worried
about the additional load from extra LeaderAndIsrRequest. What I worry
about is any unnecessary additional complexity in the controller. To me,
the additional complexity in the current design is the additional state
management in each of the existing state handling (e.g., topic creation,
controller failover, etc), and the additional synchronization since the
additional state management is not initiated from the ZK event handling
thread.

21. One of the reasons that we need to send a StopReplicaRequest to offline
replica is to handle controlled shutdown. In that case, a broker is still
alive, but indicates to the controller that it plans to shut down. Being
able to stop the replica in the shutting down broker reduces churns in ISR.
So, for simplicity, it's probably easier to always send a StopReplicaRequest
to any offline replica.

Thanks,

Jun


On Tue, Feb 21, 2017 at 2:37 PM, Dong Lin <lindon...@gmail.com> wrote:

> Hey Jun,
>
> Thanks much for your comments.
>
> I actually proposed the design to store both offline replicas and created
> replicas in per-broker znode before switching to the design in the current
> KIP. The current design stores created replicas in per-partition znode and
> transmits offline replicas via LeaderAndIsrResponse. The original solution
> is roughly the same as what you suggested. The advantage of the current
> solution is kind of philosophical: 1) we want to transmit data (e.g.
> offline replicas) using RPC and reduce dependency on zookeeper; 2) we want
> controller to be the only one that determines any state (e.g. offline
> replicas) that will be exposed to user. The advantage of the solution to
> store offline replica in zookeeper is that we can save one roundtrip time
> for controller to handle log directory failure. However, this extra
> roundtrip time should not be a big deal since the log directory failure is
> rare and inefficiency of extra latency is less of a problem when there is
> log directory failure.
>
> Do you think the two philosophical advantages of the current KIP make
> sense? If not, then I can switch to the original design that stores offline
> replicas in zookeeper. It is actually written already. One disadvantage is
> that we have to make non-trivial change the KIP (e.g. no create flag in
> LeaderAndIsrRequest and no created flag zookeeper) and restart this KIP
> discussion.
>
> Regarding 21, it seems to me that LeaderAndIsrRequest/StopReplicaRequest
> only makes sense when broker can make the choice (e.g. fetch data for this
> replica or not). In the case that the log directory of the replica is
> already offline, broker have to stop fetching data for this replica
> regardless of what controller tells it to do. Thus it seems cleaner for
> broker to stop fetch data for this replica immediately. The advantage of
> this solution is that the controller logic is simpler since it doesn't need
> to send StopReplicaRequest in case of log directory failure, and the log4j
> log is also cleaner. Is there specific advantage of having controller send
> tells broker to stop fetching data for offline replicas?
>
> Regarding 22, I agree with your observation that it will happen. I will
> update the KIP and specify that broker will exist with proper error message
> in the log and user needs to manually remove partitions and restart the
> broker.
>
> Thanks!
> Dong
>
>
>
> On Mon, Feb 20, 2017 at 10:17 PM, Jun Rao <j...@confluent.io> wrote:
>
> > Hi, Dong,
> >
> > Sorry for the delay. A few more comments.
> >
> > 20. One complexity that I found in the current KIP is that the way the
> > broker communicates failed replicas to the controller is inefficient.
> When
> > a log directory fails, the broker only sends an indication through ZK to
> > the controller and the controller has to issue a LeaderAndIsrRequest to
> > discover which replicas are offline due to log directory failure. An
> > alternative approach is that when a log directory fails, the broker just
> > writes the failed the directory and the corresponding topic partitions
> in a
> > new failed log directory ZK path like the following.
> >
> > Failed log directory path:
> > /brokers/ids/[brokerId]/failed-log-directory/directory1 => { json of the
> > topic partitions in the log directory }.
> >
> > The controller just watches for child changes in
> > /brokers/ids/[brokerId]/failed-log-directory.
> > After reading this path, the broker knows the exact set of replicas that
> > are offline and can trigger that replica state change accordingly. This
> > saves an extra round of LeaderAndIsrRequest handling.
> >
> > With this new ZK path, we get probably get rid of/broker/topics/[topic]/
> > partitions/[partitionId]/controller_managed_state. The creation of a new
> > replica is expected to always succeed unless all log directories fail, in
> > which case, the broker goes down anyway. Then, during controller
> failover,
> > the controller just needs to additionally read from ZK the extra failed
> log
> > directory paths, which is many fewer than topics or partitions.
> >
> > On broker startup, if a log directory becomes available, the
> corresponding
> > log directory path in ZK will be removed.
> >
> > The downside of this approach is that the value of this new ZK path can
> be
> > large. However, even with 5K partition per log directory and 100 bytes
> per
> > partition, the size of the value is 500KB, still less than the default
> 1MB
> > znode limit in ZK.
> >
> > 21. "Broker will remove offline replica from its replica fetcher
> threads."
> > The proposal lets the broker remove the replica from the replica fetcher
> > thread when it detects a directory failure. An alternative is to only do
> > that until the broker receives the LeaderAndIsrRequest/
> StopReplicaRequest.
> > The benefit of this is that the controller is the only one who decides
> > which replica to be removed from the replica fetcher threads. The broker
> > also doesn't need additional logic to remove the replica from replica
> > fetcher threads. The downside is that in a small window, the replica
> fetch
> > thread will keep writing to the failed log directory and may pollute the
> > log4j log.
> >
> > 22. In the current design, there is a potential corner case issue that
> the
> > same partition may exist in more than one log directory at some point.
> > Consider the following steps: (1) a new topic t1 is created and the
> > controller sends LeaderAndIsrRequest to a broker; (2) the broker creates
> > partition t1-p1 in log dir1; (3) before the broker sends a response, it
> > goes down; (4) the broker is restarted with log dir1 unreadable; (5) the
> > broker receives a new LeaderAndIsrRequest and creates partition t1-p1 on
> > log dir2; (6) at some point, the broker is restarted with log dir1 fixed.
> > Now partition t1-p1 is in two log dirs. The alternative approach that I
> > suggested above may suffer from a similar corner case issue. Since this
> is
> > rare, if the broker detects this during broker startup, it can probably
> > just log an error and exit. The admin can remove the redundant partitions
> > manually and then restart the broker.
> >
> > Thanks,
> >
> > Jun
> >
> > On Sat, Feb 18, 2017 at 9:31 PM, Dong Lin <lindon...@gmail.com> wrote:
> >
> > > Hey Jun,
> > >
> > > Could you please let me know if the solutions above could address your
> > > concern? I really want to move the discussion forward.
> > >
> > > Thanks,
> > > Dong
> > >
> > >
> > > On Tue, Feb 14, 2017 at 8:17 PM, Dong Lin <lindon...@gmail.com> wrote:
> > >
> > > > Hey Jun,
> > > >
> > > > Thanks for all your help and time to discuss this KIP. When you get
> the
> > > > time, could you let me know if the previous answers address the
> > concern?
> > > >
> > > > I think the more interesting question in your last email is where we
> > > > should store the "created" flag in ZK. I proposed the solution that I
> > > like
> > > > most, i.e. store it together with the replica assignment data in the
> > > /brokers/topics/[topic].
> > > > In order to expedite discussion, let me provide another two ideas to
> > > > address the concern just in case the first idea doesn't work:
> > > >
> > > > - We can avoid extra controller ZK read when there is no disk failure
> > > > (95% of time?). When controller starts, it doesn't
> > > > read controller_managed_state in ZK and sends LeaderAndIsrRequest
> with
> > > > "create = false". Only if LeaderAndIsrResponse shows failure for any
> > > > replica, then controller will read controller_managed_state for this
> > > > partition and re-send LeaderAndIsrRequset with "create=true" if this
> > > > replica has not been created.
> > > >
> > > > - We can significantly reduce this ZK read time by making
> > > > controller_managed_state a topic level information in ZK, e.g.
> > > > /brokers/topics/[topic]/state. Given that most topic has 10+
> partition,
> > > > the extra ZK read time should be less than 10% of the existing total
> zk
> > > > read time during controller failover.
> > > >
> > > > Thanks!
> > > > Dong
> > > >
> > > >
> > > > On Tue, Feb 14, 2017 at 7:30 AM, Dong Lin <lindon...@gmail.com>
> wrote:
> > > >
> > > >> Hey Jun,
> > > >>
> > > >> I just realized that you may be suggesting that a tool for listing
> > > >> offline directories is necessary for KIP-112 by asking whether
> KIP-112
> > > and
> > > >> KIP-113 will be in the same release. I think such a tool is useful
> but
> > > >> doesn't have to be included in KIP-112. This is because as of now
> > admin
> > > >> needs to log into broker machine and check broker log to figure out
> > the
> > > >> cause of broker failure and the bad log directory in case of disk
> > > failure.
> > > >> The KIP-112 won't make it harder since admin can still figure out
> the
> > > bad
> > > >> log directory by doing the same thing. Thus it is probably OK to
> just
> > > >> include this script in KIP-113. Regardless, my hope is to finish
> both
> > > KIPs
> > > >> ASAP and make them in the same release since both KIPs are needed
> for
> > > the
> > > >> JBOD setup.
> > > >>
> > > >> Thanks,
> > > >> Dong
> > > >>
> > > >> On Mon, Feb 13, 2017 at 5:52 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >>
> > > >>> And the test plan has also been updated to simulate disk failure by
> > > >>> changing log directory permission to 000.
> > > >>>
> > > >>> On Mon, Feb 13, 2017 at 5:50 PM, Dong Lin <lindon...@gmail.com>
> > wrote:
> > > >>>
> > > >>>> Hi Jun,
> > > >>>>
> > > >>>> Thanks for the reply. These comments are very helpful. Let me
> answer
> > > >>>> them inline.
> > > >>>>
> > > >>>>
> > > >>>> On Mon, Feb 13, 2017 at 3:25 PM, Jun Rao <j...@confluent.io>
> wrote:
> > > >>>>
> > > >>>>> Hi, Dong,
> > > >>>>>
> > > >>>>> Thanks for the reply. A few more replies and new comments below.
> > > >>>>>
> > > >>>>> On Fri, Feb 10, 2017 at 4:27 PM, Dong Lin <lindon...@gmail.com>
> > > wrote:
> > > >>>>>
> > > >>>>> > Hi Jun,
> > > >>>>> >
> > > >>>>> > Thanks for the detailed comments. Please see answers inline:
> > > >>>>> >
> > > >>>>> > On Fri, Feb 10, 2017 at 3:08 PM, Jun Rao <j...@confluent.io>
> > wrote:
> > > >>>>> >
> > > >>>>> > > Hi, Dong,
> > > >>>>> > >
> > > >>>>> > > Thanks for the updated wiki. A few comments below.
> > > >>>>> > >
> > > >>>>> > > 1. Topics get created
> > > >>>>> > > 1.1 Instead of storing successfully created replicas in ZK,
> > could
> > > >>>>> we
> > > >>>>> > store
> > > >>>>> > > unsuccessfully created replicas in ZK? Since the latter is
> less
> > > >>>>> common,
> > > >>>>> > it
> > > >>>>> > > probably reduces the load on ZK.
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > We can store unsuccessfully created replicas in ZK. But I am
> not
> > > >>>>> sure if
> > > >>>>> > that can reduce write load on ZK.
> > > >>>>> >
> > > >>>>> > If we want to reduce write load on ZK using by store
> > unsuccessfully
> > > >>>>> created
> > > >>>>> > replicas in ZK, then broker should not write to ZK if all
> > replicas
> > > >>>>> are
> > > >>>>> > successfully created. It means that if
> > > /broker/topics/[topic]/partiti
> > > >>>>> > ons/[partitionId]/controller_managed_state doesn't exist in ZK
> > for
> > > >>>>> a given
> > > >>>>> > partition, we have to assume all replicas of this partition
> have
> > > been
> > > >>>>> > successfully created and send LeaderAndIsrRequest with create =
> > > >>>>> false. This
> > > >>>>> > becomes a problem if controller crashes before receiving
> > > >>>>> > LeaderAndIsrResponse to validate whether a replica has been
> > > created.
> > > >>>>> >
> > > >>>>> > I think this approach and reduce the number of bytes stored in
> > ZK.
> > > >>>>> But I am
> > > >>>>> > not sure if this is a concern.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> >
> > > >>>>> I was mostly concerned about the controller failover time.
> > Currently,
> > > >>>>> the
> > > >>>>> controller failover is likely dominated by the cost of reading
> > > >>>>> topic/partition level information from ZK. If we add another
> > > partition
> > > >>>>> level path in ZK, it probably will double the controller failover
> > > >>>>> time. If
> > > >>>>> the approach of representing the non-created replicas doesn't
> work,
> > > >>>>> have
> > > >>>>> you considered just adding the created flag in the leaderAndIsr
> > path
> > > >>>>> in ZK?
> > > >>>>>
> > > >>>>>
> > > >>>> Yes, I have considered adding the created flag in the leaderAndIsr
> > > path
> > > >>>> in ZK. If we were to add created flag per replica in the
> > > >>>> LeaderAndIsrRequest, then it requires a lot of change in the code
> > > base.
> > > >>>>
> > > >>>> If we don't add created flag per replica in the
> LeaderAndIsrRequest,
> > > >>>> then the information in leaderAndIsr path in ZK and
> > > LeaderAndIsrRequest
> > > >>>> would be different. Further, the procedure for broker to update
> ISR
> > > in ZK
> > > >>>> will be a bit complicated. When leader updates leaderAndIsr path
> in
> > > ZK, it
> > > >>>> will have to first read created flags from ZK, change isr, and
> write
> > > >>>> leaderAndIsr back to ZK. And it needs to check znode version and
> > > re-try
> > > >>>> write operation in ZK if controller has updated ZK during this
> > > period. This
> > > >>>> is in contrast to the current implementation where the leader
> either
> > > gets
> > > >>>> all the information from LeaderAndIsrRequest sent by controller,
> or
> > > >>>> determine the infromation by itself (e.g. ISR), before writing to
> > > >>>> leaderAndIsr path in ZK.
> > > >>>>
> > > >>>> It seems to me that the above solution is a bit complicated and
> not
> > > >>>> clean. Thus I come up with the design in this KIP to store this
> > > created
> > > >>>> flag in a separate zk path. The path is named
> > > controller_managed_state to
> > > >>>> indicate that we can store in this znode all information that is
> > > managed by
> > > >>>> controller only, as opposed to ISR.
> > > >>>>
> > > >>>> I agree with your concern of increased ZK read time during
> > controller
> > > >>>> failover. How about we store the "created" information in the
> > > >>>> znode /brokers/topics/[topic]? We can change that znode to have
> the
> > > >>>> following data format:
> > > >>>>
> > > >>>> {
> > > >>>>   "version" : 2,
> > > >>>>   "created" : {
> > > >>>>     "1" : [1, 2, 3],
> > > >>>>     ...
> > > >>>>   }
> > > >>>>   "partition" : {
> > > >>>>     "1" : [1, 2, 3],
> > > >>>>     ...
> > > >>>>   }
> > > >>>> }
> > > >>>>
> > > >>>> We won't have extra zk read using this solution. It also seems
> > > >>>> reasonable to put the partition assignment information together
> with
> > > >>>> replica creation information. The latter is only changed once
> after
> > > the
> > > >>>> partition is created or re-assigned.
> > > >>>>
> > > >>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> >
> > > >>>>> > > 1.2 If an error is received for a follower, does the
> controller
> > > >>>>> eagerly
> > > >>>>> > > remove it from ISR or do we just let the leader removes it
> > after
> > > >>>>> timeout?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > No, Controller will not actively remove it from ISR. But
> > controller
> > > >>>>> will
> > > >>>>> > recognize it as offline replica and propagate this information
> to
> > > all
> > > >>>>> > brokers via UpdateMetadataRequest. Each leader can use this
> > > >>>>> information to
> > > >>>>> > actively remove offline replica from ISR set. I have updated to
> > > wiki
> > > >>>>> to
> > > >>>>> > clarify it.
> > > >>>>> >
> > > >>>>> >
> > > >>>>>
> > > >>>>> That seems inconsistent with how the controller deals with
> offline
> > > >>>>> replicas
> > > >>>>> due to broker failures. When that happens, the broker will (1)
> > select
> > > >>>>> a new
> > > >>>>> leader if the offline replica is the leader; (2) remove the
> replica
> > > >>>>> from
> > > >>>>> ISR if the offline replica is the follower. So, intuitively, it
> > seems
> > > >>>>> that
> > > >>>>> we should be doing the same thing when dealing with offline
> > replicas
> > > >>>>> due to
> > > >>>>> disk failure.
> > > >>>>>
> > > >>>>
> > > >>>> My bad. I misunderstand how the controller currently handles
> broker
> > > >>>> failure and ISR change. Yes we should do the same thing when
> dealing
> > > with
> > > >>>> offline replicas here. I have updated the KIP to specify that,
> when
> > an
> > > >>>> offline replica is discovered by controller, the controller
> removes
> > > offline
> > > >>>> replicas from ISR in the ZK and sends LeaderAndIsrRequest with
> > > updated ISR
> > > >>>> to be used by partition leaders.
> > > >>>>
> > > >>>>
> > > >>>>>
> > > >>>>>
> > > >>>>>
> > > >>>>> >
> > > >>>>> > > 1.3 Similar, if an error is received for a leader, should the
> > > >>>>> controller
> > > >>>>> > > trigger leader election again?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > Yes, controller will trigger leader election if leader replica
> is
> > > >>>>> offline.
> > > >>>>> > I have updated the wiki to clarify it.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 2. A log directory stops working on a broker during runtime:
> > > >>>>> > > 2.1 It seems the broker remembers the failed directory after
> > > >>>>> hitting an
> > > >>>>> > > IOException and the failed directory won't be used for
> creating
> > > new
> > > >>>>> > > partitions until the broker is restarted? If so, could you
> add
> > > >>>>> that to
> > > >>>>> > the
> > > >>>>> > > wiki.
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > Right, broker assumes a log directory to be good after it
> starts,
> > > >>>>> and mark
> > > >>>>> > log directory as bad once there is IOException when broker
> > attempts
> > > >>>>> to
> > > >>>>> > access the log directory. New replicas will only be created on
> > good
> > > >>>>> log
> > > >>>>> > directory. I just added this to the KIP.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > > 2.2 Could you be a bit more specific on how and during which
> > > >>>>> operation
> > > >>>>> > the
> > > >>>>> > > broker detects directory failure? Is it when the broker hits
> an
> > > >>>>> > IOException
> > > >>>>> > > during writes, or both reads and writes?  For example, during
> > > >>>>> broker
> > > >>>>> > > startup, it only reads from each of the log directories, if
> it
> > > >>>>> hits an
> > > >>>>> > > IOException there, does the broker immediately mark the
> > directory
> > > >>>>> as
> > > >>>>> > > offline?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > Broker marks log directory as bad once there is IOException
> when
> > > >>>>> broker
> > > >>>>> > attempts to access the log directory. This includes read and
> > write.
> > > >>>>> These
> > > >>>>> > operations include log append, log read, log cleaning,
> watermark
> > > >>>>> checkpoint
> > > >>>>> > etc. If broker hits IOException when it reads from each of the
> > log
> > > >>>>> > directory during startup, it immediately mark the directory as
> > > >>>>> offline.
> > > >>>>> >
> > > >>>>> > I just updated the KIP to clarify it.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > > 3. Partition reassignment: If we know a replica is offline,
> do
> > we
> > > >>>>> still
> > > >>>>> > > want to send StopReplicaRequest to it?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > No, controller doesn't send StopReplicaRequest for an offline
> > > >>>>> replica.
> > > >>>>> > Controller treats this scenario in the same way that exiting
> > Kafka
> > > >>>>> > implementation does when the broker of this replica is offline.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 4. UpdateMetadataRequestPartitionState: For
> offline_replicas,
> > do
> > > >>>>> they
> > > >>>>> > only
> > > >>>>> > > include offline replicas due to log directory failures or do
> > they
> > > >>>>> also
> > > >>>>> > > include offline replicas due to broker failure?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > UpdateMetadataRequestPartitionState's offline_replicas include
> > > >>>>> offline
> > > >>>>> > replicas due to both log directory failure and broker failure.
> > This
> > > >>>>> is to
> > > >>>>> > make the semantics of this field easier to understand. Broker
> can
> > > >>>>> > distinguish whether a replica is offline due to broker failure
> or
> > > >>>>> disk
> > > >>>>> > failure by checking whether a broker is live in the
> > > >>>>> UpdateMetadataRequest.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 5. Tools: Could we add some kind of support in the tool to
> list
> > > >>>>> offline
> > > >>>>> > > directories?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > In KIP-112 we don't have tools to list offline directories
> > because
> > > >>>>> we have
> > > >>>>> > intentionally avoided exposing log directory information (e.g.
> > log
> > > >>>>> > directory path) to user or other brokers. I think we can add
> this
> > > >>>>> feature
> > > >>>>> > in KIP-113, in which we will have DescribeDirsRequest to list
> log
> > > >>>>> directory
> > > >>>>> > information (e.g. partition assignment, path, size) needed for
> > > >>>>> rebalance.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> Since we are introducing a new failure mode, if a replica becomes
> > > >>>>> offline
> > > >>>>> due to failure in log directories, the first thing an admin wants
> > to
> > > >>>>> know
> > > >>>>> is which log directories are offline from the broker's
> perspective.
> > > >>>>> So,
> > > >>>>> including such a tool will be useful. Do you plan to do KIP-112
> and
> > > >>>>> KIP-113
> > > >>>>>  in the same release?
> > > >>>>>
> > > >>>>>
> > > >>>> Yes, I agree that including such a tool is using. This is probably
> > > >>>> better to be added in KIP-113 because we need DescribeDirsRequest
> to
> > > get
> > > >>>> this information. I will update KIP-113 to include this tool.
> > > >>>>
> > > >>>> I plan to do KIP-112 and KIP-113 separately to make each KIP and
> > their
> > > >>>> patch easier to review. I don't have any plan about which release
> to
> > > have
> > > >>>> these KIPs. My plan is to both of them ASAP. Is there particular
> > > timeline
> > > >>>> you prefer for code of these two KIPs to checked-in?
> > > >>>>
> > > >>>>
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 6. Metrics: Could we add some metrics to show offline
> > > directories?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > Sure. I think it makes sense to have each broker report its
> > number
> > > of
> > > >>>>> > offline replicas and offline log directories. The previous
> metric
> > > >>>>> was put
> > > >>>>> > in KIP-113. I just added both metrics in KIP-112.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 7. There are still references to kafka-log-dirs.sh. Are they
> > > valid?
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > My bad. I just removed this from "Changes in Operational
> > > Procedures"
> > > >>>>> and
> > > >>>>> > "Test Plan" in the KIP.
> > > >>>>> >
> > > >>>>> >
> > > >>>>> > >
> > > >>>>> > > 8. Do you think KIP-113 is ready for review? One thing that
> > > KIP-113
> > > >>>>> > > mentions during partition reassignment is to first send
> > > >>>>> > > LeaderAndIsrRequest, followed by ChangeReplicaDirRequest. It
> > > seems
> > > >>>>> it's
> > > >>>>> > > better if the replicas are created in the right log directory
> > in
> > > >>>>> the
> > > >>>>> > first
> > > >>>>> > > place? The reason that I brought it up here is because it may
> > > >>>>> affect the
> > > >>>>> > > protocol of LeaderAndIsrRequest.
> > > >>>>> > >
> > > >>>>> >
> > > >>>>> > Yes, KIP-113 is ready for review. The advantage of the current
> > > >>>>> design is
> > > >>>>> > that we can keep LeaderAndIsrRequest log-direcotry-agnostic.
> The
> > > >>>>> > implementation would be much easier to read if all log related
> > > logic
> > > >>>>> (e.g.
> > > >>>>> > various errors) are put in ChangeReplicadIRrequest and the code
> > > path
> > > >>>>> of
> > > >>>>> > handling replica movement is separated from leadership
> handling.
> > > >>>>> >
> > > >>>>> > In other words, I think Kafka may be easier to develop in the
> > long
> > > >>>>> term if
> > > >>>>> > we separate these two requests.
> > > >>>>> >
> > > >>>>> > I agree that ideally we want to create replicas in the right
> log
> > > >>>>> directory
> > > >>>>> > in the first place. But I am not sure if there is any
> performance
> > > or
> > > >>>>> > correctness concern with the existing way of moving it after it
> > is
> > > >>>>> created.
> > > >>>>> > Besides, does this decision affect the change proposed in
> > KIP-112?
> > > >>>>> >
> > > >>>>> >
> > > >>>>> I am just wondering if you have considered including the log
> > > directory
> > > >>>>> for
> > > >>>>> the replicas in the LeaderAndIsrRequest.
> > > >>>>>
> > > >>>>>
> > > >>>> Yeah I have thought about this idea, but only briefly. I rejected
> > this
> > > >>>> idea because log directory is broker's local information and I
> > prefer
> > > not
> > > >>>> to expose local config information to the cluster through
> > > >>>> LeaderAndIsrRequest.
> > > >>>>
> > > >>>>
> > > >>>>> 9. Could you describe when the offline replicas due to log
> > directory
> > > >>>>> failure are removed from the replica fetch threads?
> > > >>>>>
> > > >>>>
> > > >>>> Yes. If the offline replica was a leader, either a new leader is
> > > >>>> elected or all follower brokers will stop fetching for this
> > > partition. If
> > > >>>> the offline replica is a follower, the broker will stop fetching
> for
> > > this
> > > >>>> replica immediately. A broker stops fetching data for a replica by
> > > removing
> > > >>>> the replica from the replica fetch threads. I have updated the KIP
> > to
> > > >>>> clarify it.
> > > >>>>
> > > >>>>
> > > >>>>>
> > > >>>>> 10. The wiki mentioned changing the log directory to a file for
> > > >>>>> simulating
> > > >>>>> disk failure in system tests. Could we just change the permission
> > of
> > > >>>>> the
> > > >>>>> log directory to 000 to simulate that?
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>> Sure,
> > > >>>>
> > > >>>>
> > > >>>>>
> > > >>>>> Thanks,
> > > >>>>>
> > > >>>>> Jun
> > > >>>>>
> > > >>>>>
> > > >>>>> > > Jun
> > > >>>>> > >
> > > >>>>> > > On Fri, Feb 10, 2017 at 9:53 AM, Dong Lin <
> lindon...@gmail.com
> > >
> > > >>>>> wrote:
> > > >>>>> > >
> > > >>>>> > > > Hi Jun,
> > > >>>>> > > >
> > > >>>>> > > > Can I replace zookeeper access with direct RPC for both ISR
> > > >>>>> > notification
> > > >>>>> > > > and disk failure notification in a future KIP, or do you
> feel
> > > we
> > > >>>>> should
> > > >>>>> > > do
> > > >>>>> > > > it in this KIP?
> > > >>>>> > > >
> > > >>>>> > > > Hi Eno, Grant and everyone,
> > > >>>>> > > >
> > > >>>>> > > > Is there further improvement you would like to see with
> this
> > > KIP?
> > > >>>>> > > >
> > > >>>>> > > > Thanks you all for the comments,
> > > >>>>> > > >
> > > >>>>> > > > Dong
> > > >>>>> > > >
> > > >>>>> > > >
> > > >>>>> > > >
> > > >>>>> > > > On Thu, Feb 9, 2017 at 4:45 PM, Dong Lin <
> > lindon...@gmail.com>
> > > >>>>> wrote:
> > > >>>>> > > >
> > > >>>>> > > > >
> > > >>>>> > > > >
> > > >>>>> > > > > On Thu, Feb 9, 2017 at 3:37 PM, Colin McCabe <
> > > >>>>> cmcc...@apache.org>
> > > >>>>> > > wrote:
> > > >>>>> > > > >
> > > >>>>> > > > >> On Thu, Feb 9, 2017, at 11:40, Dong Lin wrote:
> > > >>>>> > > > >> > Thanks for all the comments Colin!
> > > >>>>> > > > >> >
> > > >>>>> > > > >> > To answer your questions:
> > > >>>>> > > > >> > - Yes, a broker will shutdown if all its log
> directories
> > > >>>>> are bad.
> > > >>>>> > > > >>
> > > >>>>> > > > >> That makes sense.  Can you add this to the writeup?
> > > >>>>> > > > >>
> > > >>>>> > > > >
> > > >>>>> > > > > Sure. This has already been added. You can find it here
> > > >>>>> > > > > <https://cwiki.apache.org/confluence/pages/
> > diffpagesbyversio
> > > >>>>> n.action
> > > >>>>> > ?
> > > >>>>> > > > pageId=67638402&selectedPageVersions=9&
> selectedPageVersions=
> > > 10>
> > > >>>>> > > > > .
> > > >>>>> > > > >
> > > >>>>> > > > >
> > > >>>>> > > > >>
> > > >>>>> > > > >> > - I updated the KIP to explicitly state that a log
> > > >>>>> directory will
> > > >>>>> > be
> > > >>>>> > > > >> > assumed to be good until broker sees IOException when
> it
> > > >>>>> tries to
> > > >>>>> > > > access
> > > >>>>> > > > >> > the log directory.
> > > >>>>> > > > >>
> > > >>>>> > > > >> Thanks.
> > > >>>>> > > > >>
> > > >>>>> > > > >> > - Controller doesn't explicitly know whether there is
> > new
> > > >>>>> log
> > > >>>>> > > > directory
> > > >>>>> > > > >> > or
> > > >>>>> > > > >> > not. All controller knows is whether replicas are
> online
> > > or
> > > >>>>> > offline
> > > >>>>> > > > >> based
> > > >>>>> > > > >> > on LeaderAndIsrResponse. According to the existing
> Kafka
> > > >>>>> > > > implementation,
> > > >>>>> > > > >> > controller will always send LeaderAndIsrRequest to a
> > > broker
> > > >>>>> after
> > > >>>>> > it
> > > >>>>> > > > >> > bounces.
> > > >>>>> > > > >>
> > > >>>>> > > > >> I thought so.  It's good to clarify, though.  Do you
> think
> > > >>>>> it's
> > > >>>>> > worth
> > > >>>>> > > > >> adding a quick discussion of this on the wiki?
> > > >>>>> > > > >>
> > > >>>>> > > > >
> > > >>>>> > > > > Personally I don't think it is needed. If broker starts
> > with
> > > >>>>> no bad
> > > >>>>> > log
> > > >>>>> > > > > directory, everything should work it is and we should not
> > > need
> > > >>>>> to
> > > >>>>> > > clarify
> > > >>>>> > > > > it. The KIP has already covered the scenario when a
> broker
> > > >>>>> starts
> > > >>>>> > with
> > > >>>>> > > > bad
> > > >>>>> > > > > log directory. Also, the KIP doesn't claim or hint that
> we
> > > >>>>> support
> > > >>>>> > > > dynamic
> > > >>>>> > > > > addition of new log directories. I think we are good.
> > > >>>>> > > > >
> > > >>>>> > > > >
> > > >>>>> > > > >> best,
> > > >>>>> > > > >> Colin
> > > >>>>> > > > >>
> > > >>>>> > > > >> >
> > > >>>>> > > > >> > Please see this
> > > >>>>> > > > >> > <https://cwiki.apache.org/conf
> > > >>>>> luence/pages/diffpagesbyversio
> > > >>>>> > > > >> n.action?pageId=67638402&selectedPageVersions=9&
> > > >>>>> > > > selectedPageVersions=10>
> > > >>>>> > > > >> > for the change of the KIP.
> > > >>>>> > > > >> >
> > > >>>>> > > > >> > On Thu, Feb 9, 2017 at 11:04 AM, Colin McCabe <
> > > >>>>> cmcc...@apache.org
> > > >>>>> > >
> > > >>>>> > > > >> wrote:
> > > >>>>> > > > >> >
> > > >>>>> > > > >> > > On Thu, Feb 9, 2017, at 11:03, Colin McCabe wrote:
> > > >>>>> > > > >> > > > Thanks, Dong L.
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > > Do we plan on bringing down the broker process
> when
> > > all
> > > >>>>> log
> > > >>>>> > > > >> directories
> > > >>>>> > > > >> > > > are offline?
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > > Can you explicitly state on the KIP that the log
> > dirs
> > > >>>>> are all
> > > >>>>> > > > >> considered
> > > >>>>> > > > >> > > > good after the broker process is bounced?  It
> seems
> > > >>>>> like an
> > > >>>>> > > > >> important
> > > >>>>> > > > >> > > > thing to be clear about.  Also, perhaps discuss
> how
> > > the
> > > >>>>> > > controller
> > > >>>>> > > > >> > > > becomes aware of the newly good log directories
> > after
> > > a
> > > >>>>> broker
> > > >>>>> > > > >> bounce
> > > >>>>> > > > >> > > > (and whether this triggers re-election).
> > > >>>>> > > > >> > >
> > > >>>>> > > > >> > > I meant to write, all the log dirs where the broker
> > can
> > > >>>>> still
> > > >>>>> > read
> > > >>>>> > > > the
> > > >>>>> > > > >> > > index and some other files.  Clearly, log dirs that
> > are
> > > >>>>> > completely
> > > >>>>> > > > >> > > inaccessible will still be considered bad after a
> > broker
> > > >>>>> process
> > > >>>>> > > > >> bounce.
> > > >>>>> > > > >> > >
> > > >>>>> > > > >> > > best,
> > > >>>>> > > > >> > > Colin
> > > >>>>> > > > >> > >
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > > +1 (non-binding) aside from that
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > > On Wed, Feb 8, 2017, at 00:47, Dong Lin wrote:
> > > >>>>> > > > >> > > > > Hi all,
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > > Thank you all for the helpful suggestion. I have
> > > >>>>> updated the
> > > >>>>> > > KIP
> > > >>>>> > > > >> to
> > > >>>>> > > > >> > > > > address
> > > >>>>> > > > >> > > > > the comments received so far. See here
> > > >>>>> > > > >> > > > > <https://cwiki.apache.org/conf
> > > >>>>> > luence/pages/diffpagesbyversio
> > > >>>>> > > > >> n.action?
> > > >>>>> > > > >> > > pageId=67638402&selectedPageVe
> > > >>>>> rsions=8&selectedPageVersions=
> > > >>>>> > 9>to
> > > >>>>> > > > >> > > > > read the changes of the KIP. Here is a summary
> of
> > > >>>>> change:
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > > - Updated the Proposed Change section to change
> > the
> > > >>>>> recovery
> > > >>>>> > > > >> steps.
> > > >>>>> > > > >> > > After
> > > >>>>> > > > >> > > > > this change, broker will also create replica as
> > long
> > > >>>>> as all
> > > >>>>> > > log
> > > >>>>> > > > >> > > > > directories
> > > >>>>> > > > >> > > > > are working.
> > > >>>>> > > > >> > > > > - Removed kafka-log-dirs.sh from this KIP since
> > user
> > > >>>>> no
> > > >>>>> > longer
> > > >>>>> > > > >> needs to
> > > >>>>> > > > >> > > > > use
> > > >>>>> > > > >> > > > > it for recovery from bad disks.
> > > >>>>> > > > >> > > > > - Explained how the znode
> controller_managed_state
> > > is
> > > >>>>> > managed
> > > >>>>> > > in
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > > > Public
> > > >>>>> > > > >> > > > > interface section.
> > > >>>>> > > > >> > > > > - Explained what happens during controller
> > failover,
> > > >>>>> > partition
> > > >>>>> > > > >> > > > > reassignment
> > > >>>>> > > > >> > > > > and topic deletion in the Proposed Change
> section.
> > > >>>>> > > > >> > > > > - Updated Future Work section to include the
> > > following
> > > >>>>> > > potential
> > > >>>>> > > > >> > > > > improvements
> > > >>>>> > > > >> > > > >   - Let broker notify controller of ISR change
> and
> > > >>>>> disk
> > > >>>>> > state
> > > >>>>> > > > >> change
> > > >>>>> > > > >> > > via
> > > >>>>> > > > >> > > > > RPC instead of using zookeeper
> > > >>>>> > > > >> > > > >   - Handle various failure scenarios (e.g. slow
> > > disk)
> > > >>>>> on a
> > > >>>>> > > > >> case-by-case
> > > >>>>> > > > >> > > > > basis. For example, we may want to detect slow
> > disk
> > > >>>>> and
> > > >>>>> > > consider
> > > >>>>> > > > >> it as
> > > >>>>> > > > >> > > > > offline.
> > > >>>>> > > > >> > > > >   - Allow admin to mark a directory as bad so
> that
> > > it
> > > >>>>> will
> > > >>>>> > not
> > > >>>>> > > > be
> > > >>>>> > > > >> used.
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > > Thanks,
> > > >>>>> > > > >> > > > > Dong
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > > On Tue, Feb 7, 2017 at 5:23 PM, Dong Lin <
> > > >>>>> > lindon...@gmail.com
> > > >>>>> > > >
> > > >>>>> > > > >> wrote:
> > > >>>>> > > > >> > > > >
> > > >>>>> > > > >> > > > > > Hey Eno,
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > Thanks much for the comment!
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > I still think the complexity added to Kafka is
> > > >>>>> justified
> > > >>>>> > by
> > > >>>>> > > > its
> > > >>>>> > > > >> > > benefit.
> > > >>>>> > > > >> > > > > > Let me provide my reasons below.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > 1) The additional logic is easy to understand
> > and
> > > >>>>> thus its
> > > >>>>> > > > >> complexity
> > > >>>>> > > > >> > > > > > should be reasonable.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > On the broker side, it needs to catch
> exception
> > > when
> > > >>>>> > access
> > > >>>>> > > > log
> > > >>>>> > > > >> > > directory,
> > > >>>>> > > > >> > > > > > mark log directory and all its replicas as
> > > offline,
> > > >>>>> notify
> > > >>>>> > > > >> > > controller by
> > > >>>>> > > > >> > > > > > writing the zookeeper notification path, and
> > > >>>>> specify error
> > > >>>>> > > in
> > > >>>>> > > > >> > > > > > LeaderAndIsrResponse. On the controller side,
> it
> > > >>>>> will
> > > >>>>> > > listener
> > > >>>>> > > > >> to
> > > >>>>> > > > >> > > > > > zookeeper for disk failure notification, learn
> > > about
> > > >>>>> > offline
> > > >>>>> > > > >> > > replicas in
> > > >>>>> > > > >> > > > > > the LeaderAndIsrResponse, and take offline
> > > replicas
> > > >>>>> into
> > > >>>>> > > > >> > > consideration when
> > > >>>>> > > > >> > > > > > electing leaders. It also mark replica as
> > created
> > > in
> > > >>>>> > > zookeeper
> > > >>>>> > > > >> and
> > > >>>>> > > > >> > > use it
> > > >>>>> > > > >> > > > > > to determine whether a replica is created.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > That is all the logic we need to add in
> Kafka. I
> > > >>>>> > personally
> > > >>>>> > > > feel
> > > >>>>> > > > >> > > this is
> > > >>>>> > > > >> > > > > > easy to reason about.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > 2) The additional code is not much.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > I expect the code for KIP-112 to be around
> 1100
> > > >>>>> lines new
> > > >>>>> > > > code.
> > > >>>>> > > > >> > > Previously
> > > >>>>> > > > >> > > > > > I have implemented a prototype of a slightly
> > > >>>>> different
> > > >>>>> > > design
> > > >>>>> > > > >> (see
> > > >>>>> > > > >> > > here
> > > >>>>> > > > >> > > > > > <https://docs.google.com/docum
> > > >>>>> ent/d/1Izza0SBmZMVUBUt9s_
> > > >>>>> > > > >> > > -Dqi3D8e0KGJQYW8xgEdRsgAI/edit>)
> > > >>>>> > > > >> > > > > > and uploaded it to github (see here
> > > >>>>> > > > >> > > > > > <https://github.com/lindong28/kafka/tree/JBOD
> > >).
> > > >>>>> The
> > > >>>>> > patch
> > > >>>>> > > > >> changed
> > > >>>>> > > > >> > > 33
> > > >>>>> > > > >> > > > > > files, added 1185 lines and deleted 183 lines.
> > The
> > > >>>>> size of
> > > >>>>> > > > >> prototype
> > > >>>>> > > > >> > > patch
> > > >>>>> > > > >> > > > > > is actually smaller than patch of KIP-107 (see
> > > here
> > > >>>>> > > > >> > > > > > <https://github.com/apache/kafka/pull/2476>)
> > > which
> > > >>>>> is
> > > >>>>> > > already
> > > >>>>> > > > >> > > accepted.
> > > >>>>> > > > >> > > > > > The KIP-107 patch changed 49 files, added 1349
> > > >>>>> lines and
> > > >>>>> > > > >> deleted 141
> > > >>>>> > > > >> > > lines.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > 3) Comparison with
> > one-broker-per-multiple-volume
> > > s
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > This KIP can improve the availability of Kafka
> > in
> > > >>>>> this
> > > >>>>> > case
> > > >>>>> > > > such
> > > >>>>> > > > >> > > that one
> > > >>>>> > > > >> > > > > > failed volume doesn't bring down the entire
> > > broker.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > 4) Comparison with one-broker-per-volume
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > If each volume maps to multiple disks, then we
> > > >>>>> still have
> > > >>>>> > > > >> similar
> > > >>>>> > > > >> > > problem
> > > >>>>> > > > >> > > > > > such that the broker will fail if any disk of
> > the
> > > >>>>> volume
> > > >>>>> > > > failed.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > If each volume maps to one disk, it means that
> > we
> > > >>>>> need to
> > > >>>>> > > > >> deploy 10
> > > >>>>> > > > >> > > > > > brokers on a machine if the machine has 10
> > disks.
> > > I
> > > >>>>> will
> > > >>>>> > > > >> explain the
> > > >>>>> > > > >> > > > > > concern with this approach in order of their
> > > >>>>> importance.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > - It is weird if we were to tell kafka user to
> > > >>>>> deploy 50
> > > >>>>> > > > >> brokers on a
> > > >>>>> > > > >> > > > > > machine of 50 disks.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > - Either when user deploys Kafka on a
> commercial
> > > >>>>> cloud
> > > >>>>> > > > platform
> > > >>>>> > > > >> or
> > > >>>>> > > > >> > > when
> > > >>>>> > > > >> > > > > > user deploys their own cluster, the size or
> > > largest
> > > >>>>> disk
> > > >>>>> > is
> > > >>>>> > > > >> usually
> > > >>>>> > > > >> > > > > > limited. There will be scenarios where user
> want
> > > to
> > > >>>>> > increase
> > > >>>>> > > > >> broker
> > > >>>>> > > > >> > > > > > capacity by having multiple disks per broker.
> > This
> > > >>>>> JBOD
> > > >>>>> > KIP
> > > >>>>> > > > >> makes it
> > > >>>>> > > > >> > > > > > feasible without hurting availability due to
> > > single
> > > >>>>> disk
> > > >>>>> > > > >> failure.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > - Automatic load rebalance across disks will
> be
> > > >>>>> easier and
> > > >>>>> > > > more
> > > >>>>> > > > >> > > flexible
> > > >>>>> > > > >> > > > > > if one broker has multiple disks. This can be
> > > >>>>> future work.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > - There is performance concern when you deploy
> > 10
> > > >>>>> broker
> > > >>>>> > > vs. 1
> > > >>>>> > > > >> > > broker on
> > > >>>>> > > > >> > > > > > one machine. The metadata the cluster,
> including
> > > >>>>> > > FetchRequest,
> > > >>>>> > > > >> > > > > > ProduceResponse, MetadataRequest and so on
> will
> > > all
> > > >>>>> be 10X
> > > >>>>> > > > >> more. The
> > > >>>>> > > > >> > > > > > packet-per-second will be 10X higher which may
> > > limit
> > > >>>>> > > > >> performance if
> > > >>>>> > > > >> > > pps is
> > > >>>>> > > > >> > > > > > the performance bottleneck. The number of
> socket
> > > on
> > > >>>>> the
> > > >>>>> > > > machine
> > > >>>>> > > > >> is
> > > >>>>> > > > >> > > 10X
> > > >>>>> > > > >> > > > > > higher. And the number of replication thread
> > will
> > > >>>>> be 100X
> > > >>>>> > > > more.
> > > >>>>> > > > >> The
> > > >>>>> > > > >> > > impact
> > > >>>>> > > > >> > > > > > will be more significant with increasing
> number
> > of
> > > >>>>> disks
> > > >>>>> > per
> > > >>>>> > > > >> > > machine. Thus
> > > >>>>> > > > >> > > > > > it will limit Kakfa's scalability in the long
> > > term.
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > Thanks,
> > > >>>>> > > > >> > > > > > Dong
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > > On Tue, Feb 7, 2017 at 1:51 AM, Eno Thereska <
> > > >>>>> > > > >> eno.there...@gmail.com
> > > >>>>> > > > >> > > >
> > > >>>>> > > > >> > > > > > wrote:
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > > > > >> Hi Dong,
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> To simplify the discussion today, on my part
> > I'll
> > > >>>>> zoom
> > > >>>>> > into
> > > >>>>> > > > one
> > > >>>>> > > > >> > > thing
> > > >>>>> > > > >> > > > > >> only:
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> - I'll discuss the options called below :
> > > >>>>> > > > >> "one-broker-per-disk" or
> > > >>>>> > > > >> > > > > >> "one-broker-per-few-disks".
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> - I completely buy the JBOD vs RAID arguments
> > so
> > > >>>>> there is
> > > >>>>> > > no
> > > >>>>> > > > >> need to
> > > >>>>> > > > >> > > > > >> discuss that part for me. I buy it that JBODs
> > are
> > > >>>>> good.
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> I find the terminology can be improved a bit.
> > > >>>>> Ideally
> > > >>>>> > we'd
> > > >>>>> > > be
> > > >>>>> > > > >> > > talking
> > > >>>>> > > > >> > > > > >> about volumes, not disks. Just to make it
> clear
> > > >>>>> that
> > > >>>>> > Kafka
> > > >>>>> > > > >> > > understand
> > > >>>>> > > > >> > > > > >> volumes/directories, not individual raw
> disks.
> > So
> > > >>>>> by
> > > >>>>> > > > >> > > > > >> "one-broker-per-few-disks" what I mean is
> that
> > > the
> > > >>>>> admin
> > > >>>>> > > can
> > > >>>>> > > > >> pool a
> > > >>>>> > > > >> > > few
> > > >>>>> > > > >> > > > > >> disks together to create a volume/directory
> and
> > > >>>>> give that
> > > >>>>> > > to
> > > >>>>> > > > >> Kafka.
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> The kernel of my question will be that the
> > admin
> > > >>>>> already
> > > >>>>> > > has
> > > >>>>> > > > >> tools
> > > >>>>> > > > >> > > to 1)
> > > >>>>> > > > >> > > > > >> create volumes/directories from a JBOD and 2)
> > > >>>>> start a
> > > >>>>> > > broker
> > > >>>>> > > > >> on a
> > > >>>>> > > > >> > > desired
> > > >>>>> > > > >> > > > > >> machine and 3) assign a broker resources
> like a
> > > >>>>> > directory.
> > > >>>>> > > I
> > > >>>>> > > > >> claim
> > > >>>>> > > > >> > > that
> > > >>>>> > > > >> > > > > >> those tools are sufficient to optimise
> resource
> > > >>>>> > allocation.
> > > >>>>> > > > I
> > > >>>>> > > > >> > > understand
> > > >>>>> > > > >> > > > > >> that a broker could manage point 3) itself,
> ie
> > > >>>>> juggle the
> > > >>>>> > > > >> > > directories. My
> > > >>>>> > > > >> > > > > >> question is whether the complexity added to
> > Kafka
> > > >>>>> is
> > > >>>>> > > > justified.
> > > >>>>> > > > >> > > > > >> Operationally it seems to me an admin will
> > still
> > > >>>>> have to
> > > >>>>> > do
> > > >>>>> > > > >> all the
> > > >>>>> > > > >> > > three
> > > >>>>> > > > >> > > > > >> items above.
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> Looking forward to the discussion
> > > >>>>> > > > >> > > > > >> Thanks
> > > >>>>> > > > >> > > > > >> Eno
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >> > On 1 Feb 2017, at 17:21, Dong Lin <
> > > >>>>> lindon...@gmail.com
> > > >>>>> > >
> > > >>>>> > > > >> wrote:
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > Hey Eno,
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > Thanks much for the review.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > I think your suggestion is to split disks
> of
> > a
> > > >>>>> machine
> > > >>>>> > > into
> > > >>>>> > > > >> > > multiple
> > > >>>>> > > > >> > > > > >> disk
> > > >>>>> > > > >> > > > > >> > sets and run one broker per disk set. Yeah
> > this
> > > >>>>> is
> > > >>>>> > > similar
> > > >>>>> > > > to
> > > >>>>> > > > >> > > Colin's
> > > >>>>> > > > >> > > > > >> > suggestion of one-broker-per-disk, which we
> > > have
> > > >>>>> > > evaluated
> > > >>>>> > > > at
> > > >>>>> > > > >> > > LinkedIn
> > > >>>>> > > > >> > > > > >> and
> > > >>>>> > > > >> > > > > >> > considered it to be a good short term
> > approach.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > As of now I don't think any of these
> approach
> > > is
> > > >>>>> a
> > > >>>>> > better
> > > >>>>> > > > >> > > alternative in
> > > >>>>> > > > >> > > > > >> > the long term. I will summarize these
> here. I
> > > >>>>> have put
> > > >>>>> > > > these
> > > >>>>> > > > >> > > reasons in
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> > KIP's motivation section and rejected
> > > alternative
> > > >>>>> > > section.
> > > >>>>> > > > I
> > > >>>>> > > > >> am
> > > >>>>> > > > >> > > happy to
> > > >>>>> > > > >> > > > > >> > discuss more and I would certainly like to
> > use
> > > an
> > > >>>>> > > > alternative
> > > >>>>> > > > >> > > solution
> > > >>>>> > > > >> > > > > >> that
> > > >>>>> > > > >> > > > > >> > is easier to do with better performance.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-10: if we switch from
> RAID-10
> > > >>>>> with
> > > >>>>> > > > >> > > > > >> replication-factoer=2 to
> > > >>>>> > > > >> > > > > >> > JBOD with replicatio-factor=3, we get 25%
> > > >>>>> reduction in
> > > >>>>> > > disk
> > > >>>>> > > > >> usage
> > > >>>>> > > > >> > > and
> > > >>>>> > > > >> > > > > >> > doubles the tolerance of broker failure
> > before
> > > >>>>> data
> > > >>>>> > > > >> > > unavailability from
> > > >>>>> > > > >> > > > > >> 1
> > > >>>>> > > > >> > > > > >> > to 2. This is pretty huge gain for any
> > company
> > > >>>>> that
> > > >>>>> > uses
> > > >>>>> > > > >> Kafka at
> > > >>>>> > > > >> > > large
> > > >>>>> > > > >> > > > > >> > scale.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-disk: The benefit
> > of
> > > >>>>> > > > >> > > one-broker-per-disk is
> > > >>>>> > > > >> > > > > >> that
> > > >>>>> > > > >> > > > > >> > no major code change is needed in Kafka.
> > Among
> > > >>>>> the
> > > >>>>> > > > >> disadvantage of
> > > >>>>> > > > >> > > > > >> > one-broker-per-disk summarized in the KIP
> and
> > > >>>>> previous
> > > >>>>> > > > email
> > > >>>>> > > > >> with
> > > >>>>> > > > >> > > Colin,
> > > >>>>> > > > >> > > > > >> > the biggest one is the 15% throughput loss
> > > >>>>> compared to
> > > >>>>> > > JBOD
> > > >>>>> > > > >> and
> > > >>>>> > > > >> > > less
> > > >>>>> > > > >> > > > > >> > flexibility to balance across disks.
> Further,
> > > it
> > > >>>>> > probably
> > > >>>>> > > > >> requires
> > > >>>>> > > > >> > > > > >> change
> > > >>>>> > > > >> > > > > >> > to internal deployment tools at various
> > > >>>>> companies to
> > > >>>>> > deal
> > > >>>>> > > > >> with
> > > >>>>> > > > >> > > > > >> > one-broker-per-disk setup.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - JBOD vs. RAID-0: This is the setup that
> > used
> > > at
> > > >>>>> > > > Microsoft.
> > > >>>>> > > > >> The
> > > >>>>> > > > >> > > > > >> problem is
> > > >>>>> > > > >> > > > > >> > that a broker becomes unavailable if any
> disk
> > > >>>>> fail.
> > > >>>>> > > Suppose
> > > >>>>> > > > >> > > > > >> > replication-factor=2 and there are 10 disks
> > per
> > > >>>>> > machine.
> > > >>>>> > > > >> Then the
> > > >>>>> > > > >> > > > > >> > probability of of any message becomes
> > > >>>>> unavailable due
> > > >>>>> > to
> > > >>>>> > > > disk
> > > >>>>> > > > >> > > failure
> > > >>>>> > > > >> > > > > >> with
> > > >>>>> > > > >> > > > > >> > RAID-0 is 100X higher than that with JBOD.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - JBOD vs. one-broker-per-few-disks:
> > > >>>>> > > > one-broker-per-few-disk
> > > >>>>> > > > >> is
> > > >>>>> > > > >> > > > > >> somewhere
> > > >>>>> > > > >> > > > > >> > between one-broker-per-disk and RAID-0. So
> it
> > > >>>>> carries
> > > >>>>> > an
> > > >>>>> > > > >> averaged
> > > >>>>> > > > >> > > > > >> > disadvantages of these two approaches.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > To answer your question regarding, I think
> it
> > > is
> > > >>>>> > > reasonable
> > > >>>>> > > > >> to
> > > >>>>> > > > >> > > mange
> > > >>>>> > > > >> > > > > >> disk
> > > >>>>> > > > >> > > > > >> > in Kafka. By "managing disks" we mean the
> > > >>>>> management of
> > > >>>>> > > > >> > > assignment of
> > > >>>>> > > > >> > > > > >> > replicas across disks. Here are my reasons
> in
> > > >>>>> more
> > > >>>>> > > detail:
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - I don't think this KIP is a big step
> > change.
> > > By
> > > >>>>> > > allowing
> > > >>>>> > > > >> user to
> > > >>>>> > > > >> > > > > >> > configure Kafka to run multiple log
> > directories
> > > >>>>> or
> > > >>>>> > disks
> > > >>>>> > > as
> > > >>>>> > > > >> of
> > > >>>>> > > > >> > > now, it
> > > >>>>> > > > >> > > > > >> is
> > > >>>>> > > > >> > > > > >> > implicit that Kafka manages disks. It is
> just
> > > >>>>> not a
> > > >>>>> > > > complete
> > > >>>>> > > > >> > > feature.
> > > >>>>> > > > >> > > > > >> > Microsoft and probably other companies are
> > > using
> > > >>>>> this
> > > >>>>> > > > feature
> > > >>>>> > > > >> > > under the
> > > >>>>> > > > >> > > > > >> > undesirable effect that a broker will fail
> > any
> > > >>>>> if any
> > > >>>>> > > disk
> > > >>>>> > > > >> fail.
> > > >>>>> > > > >> > > It is
> > > >>>>> > > > >> > > > > >> good
> > > >>>>> > > > >> > > > > >> > to complete this feature.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - I think it is reasonable to manage disk
> in
> > > >>>>> Kafka. One
> > > >>>>> > > of
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > most
> > > >>>>> > > > >> > > > > >> > important work that Kafka is doing is to
> > > >>>>> determine the
> > > >>>>> > > > >> replica
> > > >>>>> > > > >> > > > > >> assignment
> > > >>>>> > > > >> > > > > >> > across brokers and make sure enough copies
> > of a
> > > >>>>> given
> > > >>>>> > > > >> replica is
> > > >>>>> > > > >> > > > > >> available.
> > > >>>>> > > > >> > > > > >> > I would argue that it is not much different
> > > than
> > > >>>>> > > > determining
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > replica
> > > >>>>> > > > >> > > > > >> > assignment across disk conceptually.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > - I would agree that this KIP is improve
> > > >>>>> performance of
> > > >>>>> > > > >> Kafka at
> > > >>>>> > > > >> > > the
> > > >>>>> > > > >> > > > > >> cost
> > > >>>>> > > > >> > > > > >> > of more complexity inside Kafka, by
> switching
> > > >>>>> from
> > > >>>>> > > RAID-10
> > > >>>>> > > > to
> > > >>>>> > > > >> > > JBOD. I
> > > >>>>> > > > >> > > > > >> would
> > > >>>>> > > > >> > > > > >> > argue that this is a right direction. If we
> > can
> > > >>>>> gain
> > > >>>>> > 20%+
> > > >>>>> > > > >> > > performance by
> > > >>>>> > > > >> > > > > >> > managing NIC in Kafka as compared to
> existing
> > > >>>>> approach
> > > >>>>> > > and
> > > >>>>> > > > >> other
> > > >>>>> > > > >> > > > > >> > alternatives, I would say we should just do
> > it.
> > > >>>>> Such a
> > > >>>>> > > gain
> > > >>>>> > > > >> in
> > > >>>>> > > > >> > > > > >> performance,
> > > >>>>> > > > >> > > > > >> > or equivalently reduction in cost, can save
> > > >>>>> millions of
> > > >>>>> > > > >> dollars
> > > >>>>> > > > >> > > per year
> > > >>>>> > > > >> > > > > >> > for any company running Kafka at large
> scale.
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > Thanks,
> > > >>>>> > > > >> > > > > >> > Dong
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> > On Wed, Feb 1, 2017 at 5:41 AM, Eno
> Thereska
> > <
> > > >>>>> > > > >> > > eno.there...@gmail.com>
> > > >>>>> > > > >> > > > > >> wrote:
> > > >>>>> > > > >> > > > > >> >
> > > >>>>> > > > >> > > > > >> >> I'm coming somewhat late to the
> discussion,
> > > >>>>> apologies
> > > >>>>> > > for
> > > >>>>> > > > >> that.
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >> I'm worried about this proposal. It's
> moving
> > > >>>>> Kafka to
> > > >>>>> > a
> > > >>>>> > > > >> world
> > > >>>>> > > > >> > > where it
> > > >>>>> > > > >> > > > > >> >> manages disks. So in a sense, the scope of
> > the
> > > >>>>> KIP is
> > > >>>>> > > > >> limited,
> > > >>>>> > > > >> > > but the
> > > >>>>> > > > >> > > > > >> >> direction it sets for Kafka is quite a big
> > > step
> > > >>>>> > change.
> > > >>>>> > > > >> > > Fundamentally
> > > >>>>> > > > >> > > > > >> this
> > > >>>>> > > > >> > > > > >> >> is about balancing resources for a Kafka
> > > >>>>> broker. This
> > > >>>>> > > can
> > > >>>>> > > > be
> > > >>>>> > > > >> > > done by a
> > > >>>>> > > > >> > > > > >> >> tool, rather than by changing Kafka. E.g.,
> > the
> > > >>>>> tool
> > > >>>>> > > would
> > > >>>>> > > > >> take a
> > > >>>>> > > > >> > > bunch
> > > >>>>> > > > >> > > > > >> of
> > > >>>>> > > > >> > > > > >> >> disks together, create a volume over them
> > and
> > > >>>>> export
> > > >>>>> > > that
> > > >>>>> > > > >> to a
> > > >>>>> > > > >> > > Kafka
> > > >>>>> > > > >> > > > > >> broker
> > > >>>>> > > > >> > > > > >> >> (in addition to setting the memory limits
> > for
> > > >>>>> that
> > > >>>>> > > broker
> > > >>>>> > > > or
> > > >>>>> > > > >> > > limiting
> > > >>>>> > > > >> > > > > >> other
> > > >>>>> > > > >> > > > > >> >> resources). A different bunch of disks can
> > > then
> > > >>>>> make
> > > >>>>> > up
> > > >>>>> > > a
> > > >>>>> > > > >> second
> > > >>>>> > > > >> > > > > >> volume,
> > > >>>>> > > > >> > > > > >> >> and be used by another Kafka broker. This
> is
> > > >>>>> aligned
> > > >>>>> > > with
> > > >>>>> > > > >> what
> > > >>>>> > > > >> > > Colin is
> > > >>>>> > > > >> > > > > >> >> saying (as I understand it).
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >> Disks are not the only resource on a
> > machine,
> > > >>>>> there
> > > >>>>> > are
> > > >>>>> > > > >> several
> > > >>>>> > > > >> > > > > >> instances
> > > >>>>> > > > >> > > > > >> >> where multiple NICs are used for example.
> Do
> > > we
> > > >>>>> want
> > > >>>>> > > fine
> > > >>>>> > > > >> grained
> > > >>>>> > > > >> > > > > >> >> management of all these resources? I'd
> argue
> > > >>>>> that
> > > >>>>> > opens
> > > >>>>> > > us
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > system
> > > >>>>> > > > >> > > > > >> to a
> > > >>>>> > > > >> > > > > >> >> lot of complexity.
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >> Thanks
> > > >>>>> > > > >> > > > > >> >> Eno
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >>> On 1 Feb 2017, at 01:53, Dong Lin <
> > > >>>>> > lindon...@gmail.com
> > > >>>>> > > >
> > > >>>>> > > > >> wrote:
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>> Hi all,
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>> I am going to initiate the vote If there
> is
> > > no
> > > >>>>> > further
> > > >>>>> > > > >> concern
> > > >>>>> > > > >> > > with
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >> KIP.
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>> Thanks,
> > > >>>>> > > > >> > > > > >> >>> Dong
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>> On Fri, Jan 27, 2017 at 8:08 PM, radai <
> > > >>>>> > > > >> > > radai.rosenbl...@gmail.com>
> > > >>>>> > > > >> > > > > >> >> wrote:
> > > >>>>> > > > >> > > > > >> >>>
> > > >>>>> > > > >> > > > > >> >>>> a few extra points:
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> 1. broker per disk might also incur more
> > > >>>>> client <-->
> > > >>>>> > > > >> broker
> > > >>>>> > > > >> > > sockets:
> > > >>>>> > > > >> > > > > >> >>>> suppose every producer / consumer
> "talks"
> > to
> > > >>>>> >1
> > > >>>>> > > > partition,
> > > >>>>> > > > >> > > there's a
> > > >>>>> > > > >> > > > > >> >> very
> > > >>>>> > > > >> > > > > >> >>>> good chance that partitions that were
> > > >>>>> co-located on
> > > >>>>> > a
> > > >>>>> > > > >> single
> > > >>>>> > > > >> > > 10-disk
> > > >>>>> > > > >> > > > > >> >> broker
> > > >>>>> > > > >> > > > > >> >>>> would now be split between several
> > > single-disk
> > > >>>>> > broker
> > > >>>>> > > > >> > > processes on
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >> same
> > > >>>>> > > > >> > > > > >> >>>> machine. hard to put a multiplier on
> this,
> > > but
> > > >>>>> > likely
> > > >>>>> > > > >x1.
> > > >>>>> > > > >> > > sockets
> > > >>>>> > > > >> > > > > >> are a
> > > >>>>> > > > >> > > > > >> >>>> limited resource at the OS level and
> incur
> > > >>>>> some
> > > >>>>> > memory
> > > >>>>> > > > >> cost
> > > >>>>> > > > >> > > (kernel
> > > >>>>> > > > >> > > > > >> >>>> buffers)
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> 2. there's a memory overhead to spinning
> > up
> > > a
> > > >>>>> JVM
> > > >>>>> > > > >> (compiled
> > > >>>>> > > > >> > > code and
> > > >>>>> > > > >> > > > > >> >> byte
> > > >>>>> > > > >> > > > > >> >>>> code objects etc). if we assume this
> > > overhead
> > > >>>>> is
> > > >>>>> > ~300
> > > >>>>> > > MB
> > > >>>>> > > > >> > > (order of
> > > >>>>> > > > >> > > > > >> >>>> magnitude, specifics vary) than spinning
> > up
> > > >>>>> 10 JVMs
> > > >>>>> > > > would
> > > >>>>> > > > >> lose
> > > >>>>> > > > >> > > you 3
> > > >>>>> > > > >> > > > > >> GB
> > > >>>>> > > > >> > > > > >> >> of
> > > >>>>> > > > >> > > > > >> >>>> RAM. not a ton, but non negligible.
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> 3. there would also be some overhead
> > > >>>>> downstream of
> > > >>>>> > > kafka
> > > >>>>> > > > >> in any
> > > >>>>> > > > >> > > > > >> >> management
> > > >>>>> > > > >> > > > > >> >>>> / monitoring / log aggregation system.
> > > likely
> > > >>>>> less
> > > >>>>> > > than
> > > >>>>> > > > >> x10
> > > >>>>> > > > >> > > though.
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> 4. (related to above) - added complexity
> > of
> > > >>>>> > > > administration
> > > >>>>> > > > >> > > with more
> > > >>>>> > > > >> > > > > >> >>>> running instances.
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> is anyone running kafka with anywhere
> near
> > > >>>>> 100GB
> > > >>>>> > > heaps?
> > > >>>>> > > > i
> > > >>>>> > > > >> > > thought the
> > > >>>>> > > > >> > > > > >> >> point
> > > >>>>> > > > >> > > > > >> >>>> was to rely on kernel page cache to do
> the
> > > >>>>> disk
> > > >>>>> > > > buffering
> > > >>>>> > > > >> ....
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>> On Thu, Jan 26, 2017 at 11:00 AM, Dong
> > Lin <
> > > >>>>> > > > >> > > lindon...@gmail.com>
> > > >>>>> > > > >> > > > > >> wrote:
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>>>> Hey Colin,
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> Thanks much for the comment. Please see
> > me
> > > >>>>> comment
> > > >>>>> > > > >> inline.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> On Thu, Jan 26, 2017 at 10:15 AM, Colin
> > > >>>>> McCabe <
> > > >>>>> > > > >> > > cmcc...@apache.org>
> > > >>>>> > > > >> > > > > >> >>>> wrote:
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> On Wed, Jan 25, 2017, at 13:50, Dong
> Lin
> > > >>>>> wrote:
> > > >>>>> > > > >> > > > > >> >>>>>>> Hey Colin,
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> Good point! Yeah we have actually
> > > >>>>> considered and
> > > >>>>> > > > >> tested this
> > > >>>>> > > > >> > > > > >> >>>> solution,
> > > >>>>> > > > >> > > > > >> >>>>>>> which we call one-broker-per-disk. It
> > > >>>>> would work
> > > >>>>> > > and
> > > >>>>> > > > >> should
> > > >>>>> > > > >> > > > > >> require
> > > >>>>> > > > >> > > > > >> >>>> no
> > > >>>>> > > > >> > > > > >> >>>>>>> major change in Kafka as compared to
> > this
> > > >>>>> JBOD
> > > >>>>> > KIP.
> > > >>>>> > > > So
> > > >>>>> > > > >> it
> > > >>>>> > > > >> > > would
> > > >>>>> > > > >> > > > > >> be a
> > > >>>>> > > > >> > > > > >> >>>>> good
> > > >>>>> > > > >> > > > > >> >>>>>>> short term solution.
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> But it has a few drawbacks which
> makes
> > it
> > > >>>>> less
> > > >>>>> > > > >> desirable in
> > > >>>>> > > > >> > > the
> > > >>>>> > > > >> > > > > >> long
> > > >>>>> > > > >> > > > > >> >>>>>>> term.
> > > >>>>> > > > >> > > > > >> >>>>>>> Assume we have 10 disks on a machine.
> > > Here
> > > >>>>> are
> > > >>>>> > the
> > > >>>>> > > > >> problems:
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> Hi Dong,
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> Thanks for the thoughtful reply.
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> 1) Our stress test result shows that
> > > >>>>> > > > >> one-broker-per-disk
> > > >>>>> > > > >> > > has 15%
> > > >>>>> > > > >> > > > > >> >>>> lower
> > > >>>>> > > > >> > > > > >> >>>>>>> throughput
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> 2) Controller would need to send 10X
> as
> > > >>>>> many
> > > >>>>> > > > >> > > LeaderAndIsrRequest,
> > > >>>>> > > > >> > > > > >> >>>>>>> MetadataUpdateRequest and
> > > >>>>> StopReplicaRequest.
> > > >>>>> > This
> > > >>>>> > > > >> > > increases the
> > > >>>>> > > > >> > > > > >> >>>> burden
> > > >>>>> > > > >> > > > > >> >>>>>>> on
> > > >>>>> > > > >> > > > > >> >>>>>>> controller which can be the
> performance
> > > >>>>> > bottleneck.
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> Maybe I'm misunderstanding something,
> > but
> > > >>>>> there
> > > >>>>> > > would
> > > >>>>> > > > >> not be
> > > >>>>> > > > >> > > 10x as
> > > >>>>> > > > >> > > > > >> >>>> many
> > > >>>>> > > > >> > > > > >> >>>>>> StopReplicaRequest RPCs, would there?
> > The
> > > >>>>> other
> > > >>>>> > > > >> requests
> > > >>>>> > > > >> > > would
> > > >>>>> > > > >> > > > > >> >>>> increase
> > > >>>>> > > > >> > > > > >> >>>>>> 10x, but from a pretty low base,
> right?
> > > We
> > > >>>>> are
> > > >>>>> > not
> > > >>>>> > > > >> > > reassigning
> > > >>>>> > > > >> > > > > >> >>>>>> partitions all the time, I hope (or
> else
> > > we
> > > >>>>> have
> > > >>>>> > > > bigger
> > > >>>>> > > > >> > > > > >> problems...)
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> I think the controller will group
> > > >>>>> > StopReplicaRequest
> > > >>>>> > > > per
> > > >>>>> > > > >> > > broker and
> > > >>>>> > > > >> > > > > >> >> send
> > > >>>>> > > > >> > > > > >> >>>>> only one StopReplicaRequest to a broker
> > > >>>>> during
> > > >>>>> > > > controlled
> > > >>>>> > > > >> > > shutdown.
> > > >>>>> > > > >> > > > > >> >>>> Anyway,
> > > >>>>> > > > >> > > > > >> >>>>> we don't have to worry about this if we
> > > >>>>> agree that
> > > >>>>> > > > other
> > > >>>>> > > > >> > > requests
> > > >>>>> > > > >> > > > > >> will
> > > >>>>> > > > >> > > > > >> >>>>> increase by 10X. One MetadataRequest to
> > > send
> > > >>>>> to
> > > >>>>> > each
> > > >>>>> > > > >> broker
> > > >>>>> > > > >> > > in the
> > > >>>>> > > > >> > > > > >> >>>> cluster
> > > >>>>> > > > >> > > > > >> >>>>> every time there is leadership change.
> I
> > am
> > > >>>>> not
> > > >>>>> > sure
> > > >>>>> > > > >> this is
> > > >>>>> > > > >> > > a real
> > > >>>>> > > > >> > > > > >> >>>>> problem. But in theory this makes the
> > > >>>>> overhead
> > > >>>>> > > > complexity
> > > >>>>> > > > >> > > O(number
> > > >>>>> > > > >> > > > > >> of
> > > >>>>> > > > >> > > > > >> >>>>> broker) and may be a concern in the
> > future.
> > > >>>>> Ideally
> > > >>>>> > > we
> > > >>>>> > > > >> should
> > > >>>>> > > > >> > > avoid
> > > >>>>> > > > >> > > > > >> it.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> 3) Less efficient use of physical
> > > resource
> > > >>>>> on the
> > > >>>>> > > > >> machine.
> > > >>>>> > > > >> > > The
> > > >>>>> > > > >> > > > > >> number
> > > >>>>> > > > >> > > > > >> >>>>> of
> > > >>>>> > > > >> > > > > >> >>>>>>> socket on each machine will increase
> by
> > > >>>>> 10X. The
> > > >>>>> > > > >> number of
> > > >>>>> > > > >> > > > > >> connection
> > > >>>>> > > > >> > > > > >> >>>>>>> between any two machine will increase
> > by
> > > >>>>> 100X.
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> 4) Less efficient way to management
> > > memory
> > > >>>>> and
> > > >>>>> > > quota.
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> 5) Rebalance between disks/brokers on
> > the
> > > >>>>> same
> > > >>>>> > > > machine
> > > >>>>> > > > >> will
> > > >>>>> > > > >> > > less
> > > >>>>> > > > >> > > > > >> >>>>>>> efficient
> > > >>>>> > > > >> > > > > >> >>>>>>> and less flexible. Broker has to read
> > > data
> > > >>>>> from
> > > >>>>> > > > another
> > > >>>>> > > > >> > > broker on
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >>>>>>> same
> > > >>>>> > > > >> > > > > >> >>>>>>> machine via socket. It is also harder
> > to
> > > do
> > > >>>>> > > automatic
> > > >>>>> > > > >> load
> > > >>>>> > > > >> > > balance
> > > >>>>> > > > >> > > > > >> >>>>>>> between
> > > >>>>> > > > >> > > > > >> >>>>>>> disks on the same machine in the
> > future.
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> I will put this and the explanation
> in
> > > the
> > > >>>>> > rejected
> > > >>>>> > > > >> > > alternative
> > > >>>>> > > > >> > > > > >> >>>>> section.
> > > >>>>> > > > >> > > > > >> >>>>>>> I
> > > >>>>> > > > >> > > > > >> >>>>>>> have a few questions:
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> - Can you explain why this solution
> can
> > > >>>>> help
> > > >>>>> > avoid
> > > >>>>> > > > >> > > scalability
> > > >>>>> > > > >> > > > > >> >>>>>>> bottleneck?
> > > >>>>> > > > >> > > > > >> >>>>>>> I actually think it will exacerbate
> the
> > > >>>>> > scalability
> > > >>>>> > > > >> problem
> > > >>>>> > > > >> > > due
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >>>> 2)
> > > >>>>> > > > >> > > > > >> >>>>>>> above.
> > > >>>>> > > > >> > > > > >> >>>>>>> - Why can we push more RPC with this
> > > >>>>> solution?
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> To really answer this question we'd
> have
> > > to
> > > >>>>> take a
> > > >>>>> > > > deep
> > > >>>>> > > > >> dive
> > > >>>>> > > > >> > > into
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >>>>>> locking of the broker and figure out
> how
> > > >>>>> > effectively
> > > >>>>> > > > it
> > > >>>>> > > > >> can
> > > >>>>> > > > >> > > > > >> >> parallelize
> > > >>>>> > > > >> > > > > >> >>>>>> truly independent requests.  Almost
> > every
> > > >>>>> > > > multithreaded
> > > >>>>> > > > >> > > process is
> > > >>>>> > > > >> > > > > >> >>>> going
> > > >>>>> > > > >> > > > > >> >>>>>> to have shared state, like shared
> queues
> > > or
> > > >>>>> shared
> > > >>>>> > > > >> sockets,
> > > >>>>> > > > >> > > that is
> > > >>>>> > > > >> > > > > >> >>>>>> going to make scaling less than linear
> > > when
> > > >>>>> you
> > > >>>>> > add
> > > >>>>> > > > >> disks or
> > > >>>>> > > > >> > > > > >> >>>> processors.
> > > >>>>> > > > >> > > > > >> >>>>>> (And clearly, another option is to
> > improve
> > > >>>>> that
> > > >>>>> > > > >> scalability,
> > > >>>>> > > > >> > > rather
> > > >>>>> > > > >> > > > > >> >>>>>> than going multi-process!)
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> Yeah I also think it is better to
> improve
> > > >>>>> > scalability
> > > >>>>> > > > >> inside
> > > >>>>> > > > >> > > kafka
> > > >>>>> > > > >> > > > > >> code
> > > >>>>> > > > >> > > > > >> >>>> if
> > > >>>>> > > > >> > > > > >> >>>>> possible. I am not sure we currently
> have
> > > any
> > > >>>>> > > > scalability
> > > >>>>> > > > >> > > issue
> > > >>>>> > > > >> > > > > >> inside
> > > >>>>> > > > >> > > > > >> >>>>> Kafka that can not be removed without
> > using
> > > >>>>> > > > >> multi-process.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> - It is true that a garbage
> collection
> > in
> > > >>>>> one
> > > >>>>> > > broker
> > > >>>>> > > > >> would
> > > >>>>> > > > >> > > not
> > > >>>>> > > > >> > > > > >> affect
> > > >>>>> > > > >> > > > > >> >>>>>>> others. But that is after every
> broker
> > > >>>>> only uses
> > > >>>>> > > 1/10
> > > >>>>> > > > >> of the
> > > >>>>> > > > >> > > > > >> memory.
> > > >>>>> > > > >> > > > > >> >>>>> Can
> > > >>>>> > > > >> > > > > >> >>>>>>> we be sure that this will actually
> help
> > > >>>>> > > performance?
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> The big question is, how much memory
> do
> > > >>>>> Kafka
> > > >>>>> > > brokers
> > > >>>>> > > > >> use
> > > >>>>> > > > >> > > now, and
> > > >>>>> > > > >> > > > > >> how
> > > >>>>> > > > >> > > > > >> >>>>>> much will they use in the future?  Our
> > > >>>>> experience
> > > >>>>> > in
> > > >>>>> > > > >> HDFS
> > > >>>>> > > > >> > > was that
> > > >>>>> > > > >> > > > > >> >> once
> > > >>>>> > > > >> > > > > >> >>>>>> you start getting more than 100-200GB
> > Java
> > > >>>>> heap
> > > >>>>> > > sizes,
> > > >>>>> > > > >> full
> > > >>>>> > > > >> > > GCs
> > > >>>>> > > > >> > > > > >> start
> > > >>>>> > > > >> > > > > >> >>>>>> taking minutes to finish when using
> the
> > > >>>>> standard
> > > >>>>> > > JVMs.
> > > >>>>> > > > >> That
> > > >>>>> > > > >> > > alone
> > > >>>>> > > > >> > > > > >> is
> > > >>>>> > > > >> > > > > >> >> a
> > > >>>>> > > > >> > > > > >> >>>>>> good reason to go multi-process or
> > > consider
> > > >>>>> > storing
> > > >>>>> > > > more
> > > >>>>> > > > >> > > things off
> > > >>>>> > > > >> > > > > >> >> the
> > > >>>>> > > > >> > > > > >> >>>>>> Java heap.
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> I see. Now I agree one-broker-per-disk
> > > >>>>> should be
> > > >>>>> > more
> > > >>>>> > > > >> > > efficient in
> > > >>>>> > > > >> > > > > >> >> terms
> > > >>>>> > > > >> > > > > >> >>>> of
> > > >>>>> > > > >> > > > > >> >>>>> GC since each broker probably needs
> less
> > > >>>>> than 1/10
> > > >>>>> > of
> > > >>>>> > > > the
> > > >>>>> > > > >> > > memory
> > > >>>>> > > > >> > > > > >> >>>> available
> > > >>>>> > > > >> > > > > >> >>>>> on a typical machine nowadays. I will
> > > remove
> > > >>>>> this
> > > >>>>> > > from
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > reason of
> > > >>>>> > > > >> > > > > >> >>>>> rejection.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> Disk failure is the "easy" case.  The
> > > >>>>> "hard" case,
> > > >>>>> > > > >> which is
> > > >>>>> > > > >> > > > > >> >>>>>> unfortunately also the much more
> common
> > > >>>>> case, is
> > > >>>>> > > disk
> > > >>>>> > > > >> > > misbehavior.
> > > >>>>> > > > >> > > > > >> >>>>>> Towards the end of their lives, disks
> > tend
> > > >>>>> to
> > > >>>>> > start
> > > >>>>> > > > >> slowing
> > > >>>>> > > > >> > > down
> > > >>>>> > > > >> > > > > >> >>>>>> unpredictably.  Requests that would
> have
> > > >>>>> completed
> > > >>>>> > > > >> > > immediately
> > > >>>>> > > > >> > > > > >> before
> > > >>>>> > > > >> > > > > >> >>>>>> start taking 20, 100 500 milliseconds.
> > > >>>>> Some files
> > > >>>>> > > may
> > > >>>>> > > > >> be
> > > >>>>> > > > >> > > readable
> > > >>>>> > > > >> > > > > >> and
> > > >>>>> > > > >> > > > > >> >>>>>> other files may not be.  System calls
> > > hang,
> > > >>>>> > > sometimes
> > > >>>>> > > > >> > > forever, and
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >>>>>> Java process can't abort them, because
> > the
> > > >>>>> hang is
> > > >>>>> > > in
> > > >>>>> > > > >> the
> > > >>>>> > > > >> > > kernel.
> > > >>>>> > > > >> > > > > >> It
> > > >>>>> > > > >> > > > > >> >>>> is
> > > >>>>> > > > >> > > > > >> >>>>>> not fun when threads are stuck in "D
> > > state"
> > > >>>>> > > > >> > > > > >> >>>>>> http://stackoverflow.com/quest
> > > >>>>> > > > >> ions/20423521/process-perminan
> > > >>>>> > > > >> > > > > >> >>>>>> tly-stuck-on-d-state
> > > >>>>> > > > >> > > > > >> >>>>>> .  Even kill -9 cannot abort the
> thread
> > > >>>>> then.
> > > >>>>> > > > >> Fortunately,
> > > >>>>> > > > >> > > this is
> > > >>>>> > > > >> > > > > >> >>>>>> rare.
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> I agree it is a harder problem and it
> is
> > > >>>>> rare. We
> > > >>>>> > > > >> probably
> > > >>>>> > > > >> > > don't
> > > >>>>> > > > >> > > > > >> have
> > > >>>>> > > > >> > > > > >> >> to
> > > >>>>> > > > >> > > > > >> >>>>> worry about it in this KIP since this
> > issue
> > > >>>>> is
> > > >>>>> > > > >> orthogonal to
> > > >>>>> > > > >> > > > > >> whether or
> > > >>>>> > > > >> > > > > >> >>>> not
> > > >>>>> > > > >> > > > > >> >>>>> we use JBOD.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> Another approach we should consider is
> > for
> > > >>>>> Kafka
> > > >>>>> > to
> > > >>>>> > > > >> > > implement its
> > > >>>>> > > > >> > > > > >> own
> > > >>>>> > > > >> > > > > >> >>>>>> storage layer that would stripe across
> > > >>>>> multiple
> > > >>>>> > > disks.
> > > >>>>> > > > >> This
> > > >>>>> > > > >> > > > > >> wouldn't
> > > >>>>> > > > >> > > > > >> >>>>>> have to be done at the block level,
> but
> > > >>>>> could be
> > > >>>>> > > done
> > > >>>>> > > > >> at the
> > > >>>>> > > > >> > > file
> > > >>>>> > > > >> > > > > >> >>>> level.
> > > >>>>> > > > >> > > > > >> >>>>>> We could use consistent hashing to
> > > >>>>> determine which
> > > >>>>> > > > >> disks a
> > > >>>>> > > > >> > > file
> > > >>>>> > > > >> > > > > >> should
> > > >>>>> > > > >> > > > > >> >>>>>> end up on, for example.
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>> Are you suggesting that we should
> > > distribute
> > > >>>>> log,
> > > >>>>> > or
> > > >>>>> > > > log
> > > >>>>> > > > >> > > segment,
> > > >>>>> > > > >> > > > > >> >> across
> > > >>>>> > > > >> > > > > >> >>>>> disks of brokers? I am not sure if I
> > fully
> > > >>>>> > understand
> > > >>>>> > > > >> this
> > > >>>>> > > > >> > > > > >> approach. My
> > > >>>>> > > > >> > > > > >> >>>> gut
> > > >>>>> > > > >> > > > > >> >>>>> feel is that this would be a drastic
> > > >>>>> solution that
> > > >>>>> > > > would
> > > >>>>> > > > >> > > require
> > > >>>>> > > > >> > > > > >> >>>>> non-trivial design. While this may be
> > > useful
> > > >>>>> to
> > > >>>>> > > Kafka,
> > > >>>>> > > > I
> > > >>>>> > > > >> would
> > > >>>>> > > > >> > > > > >> prefer
> > > >>>>> > > > >> > > > > >> >> not
> > > >>>>> > > > >> > > > > >> >>>>> to discuss this in detail in this
> thread
> > > >>>>> unless you
> > > >>>>> > > > >> believe
> > > >>>>> > > > >> > > it is
> > > >>>>> > > > >> > > > > >> >>>> strictly
> > > >>>>> > > > >> > > > > >> >>>>> superior to the design in this KIP in
> > terms
> > > >>>>> of
> > > >>>>> > > solving
> > > >>>>> > > > >> our
> > > >>>>> > > > >> > > use-case.
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>>> best,
> > > >>>>> > > > >> > > > > >> >>>>>> Colin
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> Thanks,
> > > >>>>> > > > >> > > > > >> >>>>>>> Dong
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>> On Wed, Jan 25, 2017 at 11:34 AM,
> Colin
> > > >>>>> McCabe <
> > > >>>>> > > > >> > > > > >> cmcc...@apache.org>
> > > >>>>> > > > >> > > > > >> >>>>>>> wrote:
> > > >>>>> > > > >> > > > > >> >>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>> Hi Dong,
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>> Thanks for the writeup!  It's very
> > > >>>>> interesting.
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>> I apologize in advance if this has
> > been
> > > >>>>> > discussed
> > > >>>>> > > > >> > > somewhere else.
> > > >>>>> > > > >> > > > > >> >>>>> But
> > > >>>>> > > > >> > > > > >> >>>>>> I
> > > >>>>> > > > >> > > > > >> >>>>>>>> am curious if you have considered
> the
> > > >>>>> solution
> > > >>>>> > of
> > > >>>>> > > > >> running
> > > >>>>> > > > >> > > > > >> multiple
> > > >>>>> > > > >> > > > > >> >>>>>>>> brokers per node.  Clearly there is
> a
> > > >>>>> memory
> > > >>>>> > > > overhead
> > > >>>>> > > > >> with
> > > >>>>> > > > >> > > this
> > > >>>>> > > > >> > > > > >> >>>>>> solution
> > > >>>>> > > > >> > > > > >> >>>>>>>> because of the fixed cost of
> starting
> > > >>>>> multiple
> > > >>>>> > > JVMs.
> > > >>>>> > > > >> > > However,
> > > >>>>> > > > >> > > > > >> >>>>> running
> > > >>>>> > > > >> > > > > >> >>>>>>>> multiple JVMs would help avoid
> > > scalability
> > > >>>>> > > > >> bottlenecks.
> > > >>>>> > > > >> > > You
> > > >>>>> > > > >> > > > > >> could
> > > >>>>> > > > >> > > > > >> >>>>>>>> probably push more RPCs per second,
> > for
> > > >>>>> example.
> > > >>>>> > > A
> > > >>>>> > > > >> garbage
> > > >>>>> > > > >> > > > > >> >>>>> collection
> > > >>>>> > > > >> > > > > >> >>>>>>>> in one broker would not affect the
> > > >>>>> others.  It
> > > >>>>> > > would
> > > >>>>> > > > >> be
> > > >>>>> > > > >> > > > > >> interesting
> > > >>>>> > > > >> > > > > >> >>>>> to
> > > >>>>> > > > >> > > > > >> >>>>>>>> see this considered in the
> "alternate
> > > >>>>> designs"
> > > >>>>> > > > design,
> > > >>>>> > > > >> > > even if
> > > >>>>> > > > >> > > > > >> you
> > > >>>>> > > > >> > > > > >> >>>>> end
> > > >>>>> > > > >> > > > > >> >>>>>>>> up deciding it's not the way to go.
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>> best,
> > > >>>>> > > > >> > > > > >> >>>>>>>> Colin
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>> On Thu, Jan 12, 2017, at 10:46, Dong
> > Lin
> > > >>>>> wrote:
> > > >>>>> > > > >> > > > > >> >>>>>>>>> Hi all,
> > > >>>>> > > > >> > > > > >> >>>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>>> We created KIP-112: Handle disk
> > failure
> > > >>>>> for
> > > >>>>> > JBOD.
> > > >>>>> > > > >> Please
> > > >>>>> > > > >> > > find
> > > >>>>> > > > >> > > > > >> the
> > > >>>>> > > > >> > > > > >> >>>>> KIP
> > > >>>>> > > > >> > > > > >> >>>>>>>>> wiki
> > > >>>>> > > > >> > > > > >> >>>>>>>>> in the link
> > > >>>>> https://cwiki.apache.org/confl
> > > >>>>> > > > >> > > > > >> >>>> uence/display/KAFKA/KIP-
> > > >>>>> > > > >> > > > > >> >>>>>>>>> 112%3A+Handle+disk+failure+
> for+JBOD.
> > > >>>>> > > > >> > > > > >> >>>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>>> This KIP is related to KIP-113
> > > >>>>> > > > >> > > > > >> >>>>>>>>> <https://cwiki.apache.org/conf
> > > >>>>> > > > >> luence/display/KAFKA/KIP-
> > > >>>>> > > > >> > > > > >> >>>>>>>> 113%3A+Support+replicas+moveme
> > > >>>>> > > > >> nt+between+log+directories>:
> > > >>>>> > > > >> > > > > >> >>>>>>>>> Support replicas movement between
> log
> > > >>>>> > > directories.
> > > >>>>> > > > >> They
> > > >>>>> > > > >> > > are
> > > >>>>> > > > >> > > > > >> >>>> needed
> > > >>>>> > > > >> > > > > >> >>>>> in
> > > >>>>> > > > >> > > > > >> >>>>>>>>> order
> > > >>>>> > > > >> > > > > >> >>>>>>>>> to support JBOD in Kafka. Please
> help
> > > >>>>> review
> > > >>>>> > the
> > > >>>>> > > > >> KIP. You
> > > >>>>> > > > >> > > > > >> >>>> feedback
> > > >>>>> > > > >> > > > > >> >>>>> is
> > > >>>>> > > > >> > > > > >> >>>>>>>>> appreciated!
> > > >>>>> > > > >> > > > > >> >>>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>>>> Thanks,
> > > >>>>> > > > >> > > > > >> >>>>>>>>> Dong
> > > >>>>> > > > >> > > > > >> >>>>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>>
> > > >>>>> > > > >> > > > > >> >>>>>
> > > >>>>> > > > >> > > > > >> >>>>
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >> >>
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >>
> > > >>>>> > > > >> > > > > >
> > > >>>>> > > > >> > >
> > > >>>>> > > > >>
> > > >>>>> > > > >
> > > >>>>> > > > >
> > > >>>>> > > >
> > > >>>>> > >
> > > >>>>> >
> > > >>>>>
> > > >>>>
> > > >>>>
> > > >>>
> > > >>
> > > >
> > >
> >
>

Reply via email to